Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for Activating and Deactivating File Inbound Endpoints #2261

Merged
merged 2 commits into from
Dec 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.synapse.commons.handlers.MessagingHandler;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.mediators.Value;
import org.apache.synapse.registry.Registry;
import org.apache.synapse.util.xpath.SynapseXPath;
import org.jaxen.JaxenException;
import org.wso2.securevault.SecretResolver;
Expand All @@ -41,6 +42,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.regex.Matcher;
Expand All @@ -59,27 +61,29 @@ public class InboundEndpoint implements AspectConfigurable, ManagedLifecycle {
private boolean isSuspend;
private String injectingSeq;
private String onErrorSeq;
private boolean startInPausedMode;
private Map<String, String> parametersMap = new LinkedHashMap<String, String>();
private Map<String, String> parameterKeyMap = new LinkedHashMap<String, String>();
private List<MessagingHandler> handlers = new ArrayList();
private String fileName;
private SynapseEnvironment synapseEnvironment;
private InboundRequestProcessor inboundRequestProcessor;
private Registry registry;
/** car file name which this endpoint deployed from */
private String artifactContainerName;
/** Whether the deployed inbound endpoint is edited via the management console */
private boolean isEdited;
private AspectConfiguration aspectConfiguration;
/** regex for any vault expression */
private static final String secureVaultRegex = "\\{(.*?):vault-lookup\\('(.*?)'\\)\\}";
private static final String REG_INBOUND_ENDPOINT_BASE_PATH = "/repository/components/org.apache.synapse.inbound/";
private static final String INBOUND_ENDPOINT_STATE = "INBOUND_ENDPOINT_STATE";

public void init(SynapseEnvironment se) {
log.info("Initializing Inbound Endpoint: " + getName());
synapseEnvironment = se;
if(isSuspend){
log.info("Inbound endpoint " + name + " is currently suspended.");
return;
}
registry = se.getSynapseConfiguration().getRegistry();
startInPausedMode = startInPausedMode();
inboundRequestProcessor = getInboundRequestProcessor();
if (inboundRequestProcessor != null) {
try {
Expand All @@ -91,10 +95,11 @@ public void init(SynapseEnvironment se) {
}
} else {
String msg = "Inbound Request processor not found for Inbound EP : " + name +
" Protocol: " + protocol + " Class" + classImpl;
" Protocol: " + protocol + " Class" + classImpl;
log.error(msg);
throw new SynapseException(msg);
}

}

/**
Expand Down Expand Up @@ -140,6 +145,7 @@ private InboundProcessorParams populateParams() {
inboundProcessorParams.setInjectingSeq(injectingSeq);
inboundProcessorParams.setOnErrorSeq(onErrorSeq);
inboundProcessorParams.setSynapseEnvironment(synapseEnvironment);
inboundProcessorParams.setStartInPausedMode(startInPausedMode);

Properties props = Utils.paramsToProperties(parametersMap);
//replacing values by secure vault
Expand Down Expand Up @@ -232,6 +238,87 @@ public void setOnErrorSeq(String onErrorSeq) {
this.onErrorSeq = onErrorSeq;
}

/**
* Activates the inbound endpoint.
* <p>
* This method synchronizes access to ensure thread safety while activating the inbound endpoint.
* It calls the underlying {@link InboundRequestProcessor} to perform the activation logic.
* If the activation is successful, updates the inbound endpoint's state in the registry
* to {@link InboundEndpointState#ACTIVE}
* </p>
*/
public synchronized boolean activate() {

if (Objects.isNull(this.inboundRequestProcessor)) {
log.error("Unable to activate the Inbound Endpoint [" + getName() + "] because "
+ "no associated inbound request processor was found!");
}

log.info("Activating the Inbound Endpoint: " + getName());
String errorMessage = "Failed to activate the Inbound Endpoint: " + getName();
try {
if (this.inboundRequestProcessor.activate()) {
log.info("Inbound Endpoint [" + getName() + "] is successfully activated.");
setInboundEndpointStateInRegistry(InboundEndpointState.ACTIVE);
return true;
} else {
log.error(errorMessage);
}
} catch (Exception e) {
log.error(errorMessage, e);
}
return false;
}

/**
* Deactivates the inbound endpoint.
* <p>
* This method synchronizes access to ensure thread safety while deactivating the inbound endpoint.
* It calls the underlying {@link InboundRequestProcessor} to perform the deactivation logic.
* If the deactivation is successful, the method updates the inbound endpoint's state in the
* registry to {@link InboundEndpointState#INACTIVE}.
* </p>
*/
public synchronized boolean deactivate() {

if (Objects.isNull(this.inboundRequestProcessor)) {
log.error("Unable to deactivate the Inbound Endpoint [" + getName() + "] because "
+ "no associated inbound request processor was found!");
}

log.info("Deactivating the Inbound Endpoint: " + getName());
String errorMessage = "Failed to deactivate the Inbound Endpoint: " + getName();
try {
if (this.inboundRequestProcessor.deactivate()) {
log.info("Inbound Endpoint [" + getName() + "] is successfully deactivated.");
setInboundEndpointStateInRegistry(InboundEndpointState.INACTIVE);
return true;
} else {
log.error(errorMessage);
}
} catch (Exception e) {
log.error(errorMessage, e);
}
return false;
}

/**
* Checks whether the inbound endpoint is deactivated.
* <p>
* This method delegates the check to the underlying {@link InboundRequestProcessor},
* which determines the deactivation state of the inbound endpoint.
* </p>
*
* @return {@code true} if the inbound endpoint is deactivated; {@code false} otherwise.
*/
public boolean isDeactivated() {

if (Objects.isNull(this.inboundRequestProcessor)) {
return true;
}
return inboundRequestProcessor.isDeactivated();
}

public String getFileName() {
return fileName;
}
Expand Down Expand Up @@ -353,4 +440,120 @@ public void addHandler(MessagingHandler handler) {

this.handlers.add(handler);
}

/**
* Updates the state of the Inbound Endpoint in the registry.
*
* <p>This method ensures that the state of the Inbound Endpoint is persisted in
* the registry for future reference. If the registry is unavailable and state
* preservation is enabled, a warning is logged, and the state will not be updated.
*
* @param state the {@link InboundEndpointState} to be saved in the registry
*/
private void setInboundEndpointStateInRegistry(InboundEndpointState state) {
if (Objects.isNull(registry)) {
log.warn("Registry not available! The state of the Inbound Endpoint will not be saved.");
return;
}
registry.newNonEmptyResource(REG_INBOUND_ENDPOINT_BASE_PATH + getName(), false, "text/plain",
state.toString(), INBOUND_ENDPOINT_STATE);
}

/**
* Deletes the state of the Inbound Endpoint from the registry.
*
* <p>This method removes the registry entry corresponding to the current
* Inbound Endpoint's state, if it exists. If the registry is unavailable,
* the operation is skipped.
*/
private void deleteInboundEndpointStateInRegistry() {
if (Objects.isNull(registry)) {
return;
}
if (registry.getResourceProperties(REG_INBOUND_ENDPOINT_BASE_PATH + getName()) != null) {
registry.delete(REG_INBOUND_ENDPOINT_BASE_PATH + getName());
}
}

/**
* Determines whether the inbound endpoint should start in paused mode.
*
* This method evaluates the `preserveState` flag and the current state of the inbound endpoint
* to decide if the endpoint should start in a paused state.
*
* - If `preserveState` is false or if the current state in the registry is {@link InboundEndpointState#INITIAL},
* it returns the value of `suspend` attribute in the inbound endpoint configuration`.
* - Otherwise, it checks if the current state is {@link InboundEndpointState#INACTIVE}
* and returns `true` if it is, indicating the endpoint should start in paused mode.
*
* @return {@code true} if the inbound endpoint should start in paused mode, {@code false} otherwise.
*/
private boolean startInPausedMode() {

if (getInboundEndpointStateFromRegistry() == InboundEndpointState.INITIAL) {
return isSuspend();
}
return (getInboundEndpointStateFromRegistry() == InboundEndpointState.INACTIVE);
}

/**
* Retrieves the current state of the inbound endpoint from the registry.
*
* This method checks the registry for the state of the inbound endpoint associated with
* the provided name. It first fetches the resource properties of the inbound endpoint from
* the registry. If no properties are found, the method assumes the state is {@link InboundEndpointState#INITIAL}.
*
* If the state is present, it determines whether the state is {@link InboundEndpointState#ACTIVE}.
* or {@link InboundEndpointState#INACTIVE}.
*
* @return The current state of the inbound endpoint, as either {@link InboundEndpointState#ACTIVE},
* {@link InboundEndpointState#INACTIVE}, or {@link InboundEndpointState#INITIAL} if not explicitly set.
*/
private InboundEndpointState getInboundEndpointStateFromRegistry() {
Properties resourceProperties = null;
if (Objects.nonNull(registry)) {
resourceProperties = registry.getResourceProperties(REG_INBOUND_ENDPOINT_BASE_PATH + getName());
}

if (resourceProperties == null) {
return InboundEndpointState.INITIAL;
}

String state = resourceProperties.getProperty(INBOUND_ENDPOINT_STATE);
if (InboundEndpointState.ACTIVE.toString().equalsIgnoreCase(state)) {
return InboundEndpointState.ACTIVE;
}
return InboundEndpointState.INACTIVE;
}

private enum InboundEndpointState {
INITIAL, ACTIVE, INACTIVE
}

/**
* Updates the state of the inbound endpoint to either paused or active based on the given parameter.
* <p>
* This method attempts to change the state of the inbound endpoint and ensures that the state in
* the registry matches the expected behavior. If the operation fails to align the actual state
* with the requested state, a warning is logged to indicate the potential for inconsistent behavior.
*
* @param pause {@code true} to pause the inbound endpoint, setting its state to {@code INACTIVE};
* {@code false} to resume the inbound endpoint, setting its state to {@code ACTIVE}.
*/
public void updateInboundEndpointState(boolean pause) {

if (Objects.isNull(inboundRequestProcessor)) {
log.error("Unable to update the state of the Inbound Endpoint [" + getName() + "] as it does not exist!");
}
if (pause && inboundRequestProcessor.isDeactivated()) {
setInboundEndpointStateInRegistry(InboundEndpointState.INACTIVE);
} else if (!pause && !inboundRequestProcessor.isDeactivated()){
setInboundEndpointStateInRegistry(InboundEndpointState.ACTIVE);
} else {
log.warn("The inbound endpoint [" + name + "] was requested to change its state to "
+ (pause ? "pause" : "resume") + ", but the operation did not complete successfully "
+ "as the actual state does not match the expected state.");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class InboundEndpointConstants {
public static final String INBOUND_ENDPOINT_PROTOCOL = "protocol";
public static final String INBOUND_ENDPOINT_CLASS = "class";
public static final String INBOUND_ENDPOINT_SUSPEND = "suspend";
public static final String INBOUND_ENDPOINT_PRESERVE_STATE = "preserve.state";
public static final String INBOUND_ENDPOINT_SEQUENCE = "sequence";
public static final String INBOUND_ENDPOINT_ERROR_SEQUENCE = "onError";
public static final String INBOUND_ENDPOINT_PARAMETERS = "parameters";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.synapse.core.SynapseEnvironment;

import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
Expand All @@ -36,6 +35,17 @@ public class InboundProcessorParams {
private String onErrorSeq;
private SynapseEnvironment synapseEnvironment;
private List<MessagingHandler> handlers;
private boolean startInPausedMode;

public boolean startInPausedMode() {

return startInPausedMode;
}

public void setStartInPausedMode(boolean startInPausedMode) {

this.startInPausedMode = startInPausedMode;
}

/**
* Get the name of the inbound endpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ public interface InboundRequestProcessor {
public void init();

public void destroy();

public boolean activate();

public boolean deactivate();

public boolean isDeactivated();
}
Loading
Loading