diff --git a/modules/core/src/main/java/org/apache/synapse/inbound/InboundEndpoint.java b/modules/core/src/main/java/org/apache/synapse/inbound/InboundEndpoint.java index bd97ef3b97..9d53d866c0 100644 --- a/modules/core/src/main/java/org/apache/synapse/inbound/InboundEndpoint.java +++ b/modules/core/src/main/java/org/apache/synapse/inbound/InboundEndpoint.java @@ -30,6 +30,7 @@ import org.apache.synapse.commons.handlers.MessagingHandler; import org.apache.synapse.core.SynapseEnvironment; import org.apache.synapse.mediators.Value; +import org.apache.synapse.registry.Registry; import org.apache.synapse.util.xpath.SynapseXPath; import org.jaxen.JaxenException; import org.wso2.securevault.SecretResolver; @@ -41,6 +42,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.ServiceLoader; import java.util.regex.Matcher; @@ -59,12 +61,14 @@ public class InboundEndpoint implements AspectConfigurable, ManagedLifecycle { private boolean isSuspend; private String injectingSeq; private String onErrorSeq; + private boolean startInPausedMode; private Map parametersMap = new LinkedHashMap(); private Map parameterKeyMap = new LinkedHashMap(); private List handlers = new ArrayList(); private String fileName; private SynapseEnvironment synapseEnvironment; private InboundRequestProcessor inboundRequestProcessor; + private Registry registry; /** car file name which this endpoint deployed from */ private String artifactContainerName; /** Whether the deployed inbound endpoint is edited via the management console */ @@ -72,14 +76,14 @@ public class InboundEndpoint implements AspectConfigurable, ManagedLifecycle { private AspectConfiguration aspectConfiguration; /** regex for any vault expression */ private static final String secureVaultRegex = "\\{(.*?):vault-lookup\\('(.*?)'\\)\\}"; + private static final String REG_INBOUND_ENDPOINT_BASE_PATH = "/repository/components/org.apache.synapse.inbound/"; + private static final String INBOUND_ENDPOINT_STATE = "INBOUND_ENDPOINT_STATE"; public void init(SynapseEnvironment se) { log.info("Initializing Inbound Endpoint: " + getName()); synapseEnvironment = se; - if(isSuspend){ - log.info("Inbound endpoint " + name + " is currently suspended."); - return; - } + registry = se.getSynapseConfiguration().getRegistry(); + startInPausedMode = startInPausedMode(); inboundRequestProcessor = getInboundRequestProcessor(); if (inboundRequestProcessor != null) { try { @@ -91,10 +95,11 @@ public void init(SynapseEnvironment se) { } } else { String msg = "Inbound Request processor not found for Inbound EP : " + name + - " Protocol: " + protocol + " Class" + classImpl; + " Protocol: " + protocol + " Class" + classImpl; log.error(msg); throw new SynapseException(msg); } + } /** @@ -140,6 +145,7 @@ private InboundProcessorParams populateParams() { inboundProcessorParams.setInjectingSeq(injectingSeq); inboundProcessorParams.setOnErrorSeq(onErrorSeq); inboundProcessorParams.setSynapseEnvironment(synapseEnvironment); + inboundProcessorParams.setStartInPausedMode(startInPausedMode); Properties props = Utils.paramsToProperties(parametersMap); //replacing values by secure vault @@ -232,6 +238,87 @@ public void setOnErrorSeq(String onErrorSeq) { this.onErrorSeq = onErrorSeq; } + /** + * Activates the inbound endpoint. + *

+ * This method synchronizes access to ensure thread safety while activating the inbound endpoint. + * It calls the underlying {@link InboundRequestProcessor} to perform the activation logic. + * If the activation is successful, updates the inbound endpoint's state in the registry + * to {@link InboundEndpointState#ACTIVE} + *

+ */ + public synchronized boolean activate() { + + if (Objects.isNull(this.inboundRequestProcessor)) { + log.error("Unable to activate the Inbound Endpoint [" + getName() + "] because " + + "no associated inbound request processor was found!"); + } + + log.info("Activating the Inbound Endpoint: " + getName()); + String errorMessage = "Failed to activate the Inbound Endpoint: " + getName(); + try { + if (this.inboundRequestProcessor.activate()) { + log.info("Inbound Endpoint [" + getName() + "] is successfully activated."); + setInboundEndpointStateInRegistry(InboundEndpointState.ACTIVE); + return true; + } else { + log.error(errorMessage); + } + } catch (Exception e) { + log.error(errorMessage, e); + } + return false; + } + + /** + * Deactivates the inbound endpoint. + *

+ * This method synchronizes access to ensure thread safety while deactivating the inbound endpoint. + * It calls the underlying {@link InboundRequestProcessor} to perform the deactivation logic. + * If the deactivation is successful, the method updates the inbound endpoint's state in the + * registry to {@link InboundEndpointState#INACTIVE}. + *

+ */ + public synchronized boolean deactivate() { + + if (Objects.isNull(this.inboundRequestProcessor)) { + log.error("Unable to deactivate the Inbound Endpoint [" + getName() + "] because " + + "no associated inbound request processor was found!"); + } + + log.info("Deactivating the Inbound Endpoint: " + getName()); + String errorMessage = "Failed to deactivate the Inbound Endpoint: " + getName(); + try { + if (this.inboundRequestProcessor.deactivate()) { + log.info("Inbound Endpoint [" + getName() + "] is successfully deactivated."); + setInboundEndpointStateInRegistry(InboundEndpointState.INACTIVE); + return true; + } else { + log.error(errorMessage); + } + } catch (Exception e) { + log.error(errorMessage, e); + } + return false; + } + + /** + * Checks whether the inbound endpoint is deactivated. + *

+ * This method delegates the check to the underlying {@link InboundRequestProcessor}, + * which determines the deactivation state of the inbound endpoint. + *

+ * + * @return {@code true} if the inbound endpoint is deactivated; {@code false} otherwise. + */ + public boolean isDeactivated() { + + if (Objects.isNull(this.inboundRequestProcessor)) { + return true; + } + return inboundRequestProcessor.isDeactivated(); + } + public String getFileName() { return fileName; } @@ -353,4 +440,120 @@ public void addHandler(MessagingHandler handler) { this.handlers.add(handler); } + + /** + * Updates the state of the Inbound Endpoint in the registry. + * + *

This method ensures that the state of the Inbound Endpoint is persisted in + * the registry for future reference. If the registry is unavailable and state + * preservation is enabled, a warning is logged, and the state will not be updated. + * + * @param state the {@link InboundEndpointState} to be saved in the registry + */ + private void setInboundEndpointStateInRegistry(InboundEndpointState state) { + if (Objects.isNull(registry)) { + log.warn("Registry not available! The state of the Inbound Endpoint will not be saved."); + return; + } + registry.newNonEmptyResource(REG_INBOUND_ENDPOINT_BASE_PATH + getName(), false, "text/plain", + state.toString(), INBOUND_ENDPOINT_STATE); + } + + /** + * Deletes the state of the Inbound Endpoint from the registry. + * + *

This method removes the registry entry corresponding to the current + * Inbound Endpoint's state, if it exists. If the registry is unavailable, + * the operation is skipped. + */ + private void deleteInboundEndpointStateInRegistry() { + if (Objects.isNull(registry)) { + return; + } + if (registry.getResourceProperties(REG_INBOUND_ENDPOINT_BASE_PATH + getName()) != null) { + registry.delete(REG_INBOUND_ENDPOINT_BASE_PATH + getName()); + } + } + + /** + * Determines whether the inbound endpoint should start in paused mode. + * + * This method evaluates the `preserveState` flag and the current state of the inbound endpoint + * to decide if the endpoint should start in a paused state. + * + * - If `preserveState` is false or if the current state in the registry is {@link InboundEndpointState#INITIAL}, + * it returns the value of `suspend` attribute in the inbound endpoint configuration`. + * - Otherwise, it checks if the current state is {@link InboundEndpointState#INACTIVE} + * and returns `true` if it is, indicating the endpoint should start in paused mode. + * + * @return {@code true} if the inbound endpoint should start in paused mode, {@code false} otherwise. + */ + private boolean startInPausedMode() { + + if (getInboundEndpointStateFromRegistry() == InboundEndpointState.INITIAL) { + return isSuspend(); + } + return (getInboundEndpointStateFromRegistry() == InboundEndpointState.INACTIVE); + } + + /** + * Retrieves the current state of the inbound endpoint from the registry. + * + * This method checks the registry for the state of the inbound endpoint associated with + * the provided name. It first fetches the resource properties of the inbound endpoint from + * the registry. If no properties are found, the method assumes the state is {@link InboundEndpointState#INITIAL}. + * + * If the state is present, it determines whether the state is {@link InboundEndpointState#ACTIVE}. + * or {@link InboundEndpointState#INACTIVE}. + * + * @return The current state of the inbound endpoint, as either {@link InboundEndpointState#ACTIVE}, + * {@link InboundEndpointState#INACTIVE}, or {@link InboundEndpointState#INITIAL} if not explicitly set. + */ + private InboundEndpointState getInboundEndpointStateFromRegistry() { + Properties resourceProperties = null; + if (Objects.nonNull(registry)) { + resourceProperties = registry.getResourceProperties(REG_INBOUND_ENDPOINT_BASE_PATH + getName()); + } + + if (resourceProperties == null) { + return InboundEndpointState.INITIAL; + } + + String state = resourceProperties.getProperty(INBOUND_ENDPOINT_STATE); + if (InboundEndpointState.ACTIVE.toString().equalsIgnoreCase(state)) { + return InboundEndpointState.ACTIVE; + } + return InboundEndpointState.INACTIVE; + } + + private enum InboundEndpointState { + INITIAL, ACTIVE, INACTIVE + } + + /** + * Updates the state of the inbound endpoint to either paused or active based on the given parameter. + *

+ * This method attempts to change the state of the inbound endpoint and ensures that the state in + * the registry matches the expected behavior. If the operation fails to align the actual state + * with the requested state, a warning is logged to indicate the potential for inconsistent behavior. + * + * @param pause {@code true} to pause the inbound endpoint, setting its state to {@code INACTIVE}; + * {@code false} to resume the inbound endpoint, setting its state to {@code ACTIVE}. + */ + public void updateInboundEndpointState(boolean pause) { + + if (Objects.isNull(inboundRequestProcessor)) { + log.error("Unable to update the state of the Inbound Endpoint [" + getName() + "] as it does not exist!"); + } + if (pause && inboundRequestProcessor.isDeactivated()) { + setInboundEndpointStateInRegistry(InboundEndpointState.INACTIVE); + } else if (!pause && !inboundRequestProcessor.isDeactivated()){ + setInboundEndpointStateInRegistry(InboundEndpointState.ACTIVE); + } else { + log.warn("The inbound endpoint [" + name + "] was requested to change its state to " + + (pause ? "pause" : "resume") + ", but the operation did not complete successfully " + + "as the actual state does not match the expected state."); + } + } + } diff --git a/modules/core/src/main/java/org/apache/synapse/inbound/InboundEndpointConstants.java b/modules/core/src/main/java/org/apache/synapse/inbound/InboundEndpointConstants.java index 613730e61c..cfee8d8988 100644 --- a/modules/core/src/main/java/org/apache/synapse/inbound/InboundEndpointConstants.java +++ b/modules/core/src/main/java/org/apache/synapse/inbound/InboundEndpointConstants.java @@ -25,6 +25,7 @@ public class InboundEndpointConstants { public static final String INBOUND_ENDPOINT_PROTOCOL = "protocol"; public static final String INBOUND_ENDPOINT_CLASS = "class"; public static final String INBOUND_ENDPOINT_SUSPEND = "suspend"; + public static final String INBOUND_ENDPOINT_PRESERVE_STATE = "preserve.state"; public static final String INBOUND_ENDPOINT_SEQUENCE = "sequence"; public static final String INBOUND_ENDPOINT_ERROR_SEQUENCE = "onError"; public static final String INBOUND_ENDPOINT_PARAMETERS = "parameters"; diff --git a/modules/core/src/main/java/org/apache/synapse/inbound/InboundProcessorParams.java b/modules/core/src/main/java/org/apache/synapse/inbound/InboundProcessorParams.java index 9fdd89fbdd..d24475e847 100644 --- a/modules/core/src/main/java/org/apache/synapse/inbound/InboundProcessorParams.java +++ b/modules/core/src/main/java/org/apache/synapse/inbound/InboundProcessorParams.java @@ -20,7 +20,6 @@ import org.apache.synapse.core.SynapseEnvironment; import java.util.List; -import java.util.Map; import java.util.Properties; /** @@ -36,6 +35,17 @@ public class InboundProcessorParams { private String onErrorSeq; private SynapseEnvironment synapseEnvironment; private List handlers; + private boolean startInPausedMode; + + public boolean startInPausedMode() { + + return startInPausedMode; + } + + public void setStartInPausedMode(boolean startInPausedMode) { + + this.startInPausedMode = startInPausedMode; + } /** * Get the name of the inbound endpoint diff --git a/modules/core/src/main/java/org/apache/synapse/inbound/InboundRequestProcessor.java b/modules/core/src/main/java/org/apache/synapse/inbound/InboundRequestProcessor.java index 113f7d291d..f3fe5aae7a 100644 --- a/modules/core/src/main/java/org/apache/synapse/inbound/InboundRequestProcessor.java +++ b/modules/core/src/main/java/org/apache/synapse/inbound/InboundRequestProcessor.java @@ -23,4 +23,10 @@ public interface InboundRequestProcessor { public void init(); public void destroy(); + + public boolean activate(); + + public boolean deactivate(); + + public boolean isDeactivated(); } diff --git a/modules/core/src/main/java/org/apache/synapse/startup/quartz/StartUpController.java b/modules/core/src/main/java/org/apache/synapse/startup/quartz/StartUpController.java index 2d30e2b169..9d98031b78 100644 --- a/modules/core/src/main/java/org/apache/synapse/startup/quartz/StartUpController.java +++ b/modules/core/src/main/java/org/apache/synapse/startup/quartz/StartUpController.java @@ -41,6 +41,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; public class StartUpController extends AbstractStartup { @@ -90,6 +91,64 @@ public void destroy() { destroy(true); } + /** + * Deactivates the associated task by pausing its execution. + * + * @return {@code true} if the task was successfully paused; {@code false} otherwise. + * Possible reasons for returning {@code false} include: + * - The Synapse Task Manager is not initialized. + * - The Task Scheduler is null or not properly initialized. + */ + public boolean deactivateTask() { + if (!synapseTaskManager.isInitialized()) { + return false; + } + TaskScheduler taskScheduler = synapseTaskManager.getTaskScheduler(); + if (taskScheduler == null || !taskScheduler.isTaskSchedulerInitialized()) { + return false; + } + return taskScheduler.pauseTask(taskDescription.getName()); + } + + /** + * Activates the associated task by resuming its execution. + * + * @return {@code true} if the task was successfully resumed; {@code false} otherwise. + * Possible reasons for returning {@code false} include: + * - The Synapse Task Manager is not initialized. + * - The Task Scheduler is null or not properly initialized. + */ + public boolean activateTask() { + if (!synapseTaskManager.isInitialized()) { + return false; + } + TaskScheduler taskScheduler = synapseTaskManager.getTaskScheduler(); + if (taskScheduler == null || !taskScheduler.isTaskSchedulerInitialized()) { + return false; + } + return taskScheduler.resumeTask(taskDescription.getName()); + } + + /** + * Checks if the associated task is currently active. + * + * @return {@code true} if the task is active (i.e., not deactivated); {@code false} otherwise. + * Possible reasons for returning {@code false} include: + * - The Synapse Task Manager is not initialized. + * - The Task Scheduler is null or not properly initialized. + * - The task is explicitly marked as deactivated. + */ + public boolean isTaskActive() { + if (!synapseTaskManager.isInitialized()) { + return false; + } + TaskScheduler taskScheduler = synapseTaskManager.getTaskScheduler(); + if (taskScheduler == null || !taskScheduler.isTaskSchedulerInitialized()) { + return false; + } + return !taskScheduler.isTaskDeactivated(taskDescription.getName()); + } + public void init(SynapseEnvironment synapseEnvironment) { this.synapseEnvironment = synapseEnvironment; if (taskDescription == null) { @@ -219,11 +278,14 @@ private boolean resolveTaskImpl(TaskDescription taskDescription, SynapseEnvironm String taskImplClassName = taskDescription.getTaskImplClassName(); if (taskImplClassName == null || taskImplClassName.isEmpty()) { taskImplClassName = "org.apache.synapse.startup.tasks.MessageInjector"; + taskDescription.setTaskImplClassName(taskImplClassName); } - taskDescription.setTaskImplClassName(taskImplClassName); try { - task = getClass().getClassLoader().loadClass( - taskDescription.getTaskImplClassName()).newInstance(); + task = taskDescription.getResource(TaskDescription.INSTANCE); + if (Objects.isNull(task)) { + task = getClass().getClassLoader().loadClass( + taskDescription.getTaskImplClassName()).newInstance(); + } if (!(task instanceof Task)) { logger.warn("Task implementation is not a Synapse Task."); } diff --git a/modules/tasks/src/main/java/org/apache/synapse/task/TaskDescription.java b/modules/tasks/src/main/java/org/apache/synapse/task/TaskDescription.java index 4586d785a9..abc3a94200 100644 --- a/modules/tasks/src/main/java/org/apache/synapse/task/TaskDescription.java +++ b/modules/tasks/src/main/java/org/apache/synapse/task/TaskDescription.java @@ -77,6 +77,12 @@ public void addProperty(String name, String value) { properties.put(name, value); } + public void removeProperty(String name) { + if (properties != null && name != null) { + properties.remove(name); + } + } + public Object getProperty(String name) { return properties == null || name == null ? null : properties.get(name); } diff --git a/modules/tasks/src/main/java/org/apache/synapse/task/TaskScheduler.java b/modules/tasks/src/main/java/org/apache/synapse/task/TaskScheduler.java index 701d7fa87a..96d5c282c2 100644 --- a/modules/tasks/src/main/java/org/apache/synapse/task/TaskScheduler.java +++ b/modules/tasks/src/main/java/org/apache/synapse/task/TaskScheduler.java @@ -174,6 +174,40 @@ public void deleteTask(String name, String group) { taskManager.delete(name + "::" + group); } } + + /** + * Resumes a specified task. + * + * @param name The name of the task to resume. + * @return {@code true} if the task was successfully resumed; {@code false} if the task + * scheduler is not initialized or if the task resumption fails. + */ + public boolean resumeTask(String name) { + synchronized (lock) { + if (!initialized) { + logger.error("Could not resume the task [" + name + "]. Task scheduler not properly initialized."); + return false; + } + return taskManager.resume(name); + } + } + + /** + * Pauses a specified task. + * + * @param name The name of the task to resume. + * @return {@code true} if the task was successfully paused; {@code false} if the task + * scheduler is not initialized or if the task could not be paused. + */ + public boolean pauseTask(String name) { + synchronized (lock) { + if (!initialized) { + logger.error("Could not pause the task [" + name + "]. Task scheduler not properly initialized."); + return false; + } + return taskManager.pause(name); + } + } public int getRunningTaskCount() { synchronized (lock) { @@ -195,6 +229,22 @@ public boolean isTaskAlreadyRunning(Object taskKey) { } } + /** + * Checks if a task with the specified name is deactivated. + * + * @param name the name of the task to check. + * @return {@code true} if the task is deactivated, {@code false} otherwise or if the scheduler is not initialized. + */ + public boolean isTaskDeactivated(String name) { + synchronized (lock) { + if (!initialized) { + logger.error("Could not determine task status. Task scheduler not properly initialized."); + return false; + } + return taskManager.isTaskDeactivated(name); + } + } + public void setTriggerFactory(Object triggerFactory) { synchronized (lock) { if (!initialized) {