Skip to content

Commit

Permalink
Merge pull request #933 from d3sw/ONECOND-2164_updated
Browse files Browse the repository at this point in the history
ONECOND-2164-Add AppConfig support in Conductor
  • Loading branch information
cmegafu authored Feb 2, 2023
2 parents 4de70de + a20238f commit 041a587
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netflix.conductor.core.events.EventProcessor;
import com.netflix.conductor.core.events.queue.dyno.DynoEventQueueProvider;
import com.netflix.conductor.core.execution.WorkflowSweeper;
import com.netflix.conductor.core.execution.appconfig.cache.AppConfig;
import com.netflix.conductor.core.execution.batch.BatchSweeper;
import com.netflix.conductor.core.execution.batch.SherlockBatchProcessor;
import com.netflix.conductor.core.execution.tasks.*;
Expand Down Expand Up @@ -60,6 +61,7 @@ protected void configure() {
bind(ErrorLookupTask.class).asEagerSingleton();
bind(PriorityLookupTask.class).asEagerSingleton();
bind(SetVariable.class).asEagerSingleton();
bind(AppConfig.class).asEagerSingleton();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public PropertiesLoader(MetadataDAO metadata, @Named("properties") Map<String, S

@Inject
public void init() {
metadata.getConfigsByIsPreloaded(true)
metadata.getConfigs()
.forEach(it -> properties.put(it.getLeft(), StrSubstitutor.replace(it.getRight(), System.getenv())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.core.execution.ApplicationException.Code;
import com.netflix.conductor.core.execution.DeciderService.DeciderOutcome;
import com.netflix.conductor.core.execution.appconfig.cache.MetaAppConfig;
import com.netflix.conductor.core.execution.appconfig.cache.AppConfig;
import com.netflix.conductor.core.execution.tasks.SubWorkflow;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.utils.IDGenerator;
Expand Down Expand Up @@ -112,14 +112,14 @@ public class WorkflowExecutor {

private final PropertiesLoader propertiesLoader;

private final MetaAppConfig metaAppConfig;
private final AppConfig appConfig;

@Inject
public WorkflowExecutor(MetadataDAO metadata, ExecutionDAO edao, QueueDAO queue, ErrorLookupDAO errorLookupDAO,ObjectMapper om,
AuthManager auth, Configuration config,
TaskStatusListener taskStatusListener,
WorkflowStatusListener workflowStatusListener,
PropertiesLoader propertiesLoader, MetaAppConfig metaAppConfig) {
PropertiesLoader propertiesLoader, AppConfig appConfig) {
this.metadata = metadata;
this.edao = edao;
this.queue = queue;
Expand All @@ -136,7 +136,7 @@ public WorkflowExecutor(MetadataDAO metadata, ExecutionDAO edao, QueueDAO queue,
this.authContextEnabled = Boolean.parseBoolean(config.getProperty("workflow.authcontext.enabled", "false"));
this.lazyDecider = Boolean.parseBoolean(config.getProperty("workflow.lazy.decider", "false"));
this.propertiesLoader = propertiesLoader;
this.metaAppConfig = metaAppConfig;
this.appConfig = appConfig;
}

public String startWorkflow(String name, int version, String correlationId, Map<String, Object> input) throws Exception {
Expand Down Expand Up @@ -252,9 +252,8 @@ public String startWorkflow(String workflowId, String name, int version, Map<Str
wf.setContextUser(contextUser);
wf.setVariables(workflowDef.getVariables());
Map <String, String > configValues = new HashMap<>();
configValues.put(MetaAppConfig.CC_EXTRACT_SERVER, metaAppConfig.getValue(MetaAppConfig.CC_EXTRACT_SERVER));
configValues.put(MetaAppConfig.CHECKSUM_SERVER, metaAppConfig.getValue(MetaAppConfig.CHECKSUM_SERVER));
configValues.put(MetaAppConfig.ONE_CDN_SERVER, metaAppConfig.getValue(MetaAppConfig.ONE_CDN_SERVER));
Map<String, String> configs = appConfig.getConfigs();
configs.entrySet().forEach(x-> configValues.put(x.getKey(), x.getValue()));

wf.setMetaConfigs(configValues);
if (jobPriority == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
package com.netflix.conductor.core.execution.appconfig.cache;

import com.google.inject.Inject;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.AppConfigDAO;
import org.apache.commons.lang.text.StrSubstitutor;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Class to obtain the Application specific Configuration values.
*/
public class MetaAppConfig {
public class AppConfig {

public static Logger logger;

private Cache<String> appCache;
private MetadataDAO metadataDAO;

private AppConfigDAO appConfigDAO;
private final static String APP_CACHE = "APP_CACHE";
private final int TTL_SECONDS = (int) TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES);

Expand All @@ -34,10 +41,12 @@ public class MetaAppConfig {


@Inject
public MetaAppConfig(MetadataDAO metadataDAO) {
public AppConfig(AppConfigDAO appConfigDAO) {
CacheManager cacheManager = CacheManager.getInstance();
appCache = cacheManager.getCache(APP_CACHE);
this.metadataDAO = metadataDAO;
this.appConfigDAO = appConfigDAO;
logger = LogManager.getLogger(AppConfig.class);
logger.info("Initialized AppConfig");
}

/**
Expand All @@ -50,13 +59,14 @@ public MetaAppConfig(MetadataDAO metadataDAO) {
public String getValue(String key) throws Exception {
String value;
if ((value = appCache.get(key)) == null) {
synchronized (MetaAppConfig.class){
synchronized (AppConfig.class){
if ((value = appCache.get(key)) == null) {
reloadProperties(key);
value = appCache.get(key);
}
}
}
logger.info("AppConfig: Ask for " + key + ". Got " + value == null ? DEFAULT.get(key) : value);
return value == null ? DEFAULT.get(key) : value;
}

Expand All @@ -71,6 +81,41 @@ public int getIntValue(String key) throws Exception {
}


/**
* Method to obtain all the configs by querying the database
*
* @return
* @throws Exception
*/
public Map<String, String> getConfigs() throws Exception {
return appCache.getCurrentCache();
}

/**
* Method to store a new key/Value pair.
* This method will overwrite a value if the key already exists
*
* @param key
* @param value
* @throws Exception
*/
public void setValue(String key, String value) throws Exception {
appConfigDAO.setAppConfigValue(key, value);
appCache.invalidate();
}

/**
* Method to remove the configuration
*
* @param key
* @throws Exception
*/
public void removeConfig(String key) throws Exception {
appConfigDAO.removeAppConfigValue(key);
appCache.invalidate();
}


/**
* Method to refresh the cache with the values from the database
*
Expand All @@ -80,8 +125,9 @@ public int getIntValue(String key) throws Exception {
public synchronized void reloadProperties(String testKey) throws SQLException {
if (appCache.get(testKey) == null) {
appCache.invalidate();
Pair<String, String> configValue = metadataDAO.getConfigsByName(testKey);
appCache.put(configValue.getLeft(), StrSubstitutor.replace(configValue.getRight(), System.getenv()), TTL_SECONDS);
logger.info("AppConfig testKey " + testKey + ". Invalidating Cache ");
Map<String, String> configValues = appConfigDAO.getConfigs();
configValues.entrySet().forEach(configValue -> appCache.put(configValue.getKey(), StrSubstitutor.replace(configValue.getValue(), System.getenv()), TTL_SECONDS));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class Cache<T> {

Expand Down Expand Up @@ -62,6 +63,9 @@ public void expire() {
cache.entrySet().removeIf(entry -> isExpired(nowSeconds, entry.getValue()));
}

public <T> Map<String, T> getCurrentCache(){
return cache.entrySet().stream().collect(Collectors.toMap(x-> x.getKey(), x-> (T) x.getValue().getData()));
}

}

Expand Down
22 changes: 22 additions & 0 deletions core/src/main/java/com/netflix/conductor/dao/AppConfigDAO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.netflix.conductor.dao;

import org.apache.commons.lang3.tuple.Pair;

import java.util.Collections;
import java.util.Map;

public interface AppConfigDAO {
public default Pair<String, String> getConfigsByName(String name){
return null;
}

public default Map<String, String> getConfigs(){
return Collections.emptyMap();
}

public default void setAppConfigValue(String key, String value){}

public default void removeAppConfigValue(String key){}
}


7 changes: 0 additions & 7 deletions core/src/main/java/com/netflix/conductor/dao/MetadataDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,6 @@ public default List<Pair<String, String>> getConfigs() {
return Collections.emptyList();
}

public default List<Pair<String, String>> getConfigsByIsPreloaded(boolean isPreloaded) {
return Collections.emptyList();
}

public default Pair<String, String> getConfigsByName(String name){
return null;
}

public default void addConfig(String name, String value) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.appconfig.cache.AppConfig;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.service.ExecutionService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,6 +37,7 @@
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import java.io.InputStream;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -48,6 +52,7 @@
import com.netflix.conductor.core.execution.WorkflowExecutor;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;

/**
* @author Viren
Expand All @@ -70,13 +75,16 @@ public class AdminResource {
private MetadataService metaservice;
private WorkflowExecutor executor;

private AppConfig appConfig;

@Inject
public AdminResource(Configuration config, ExecutionService service, MetadataService metaservice,QueueDAO queue, MetadataDAO metadata,WorkflowExecutor executor) {
public AdminResource(Configuration config, ExecutionService service, MetadataService metaservice,QueueDAO queue, MetadataDAO metadata, AppConfig appConfig, WorkflowExecutor executor) {
this.config = config;
this.service = service;
this.metaservice = metaservice;
this.queue = queue;
this.metadata = metadata;
this.appConfig = appConfig;
this.version = "UNKNOWN";
this.buildDate = "UNKNOWN";
this.executor = executor;
Expand Down Expand Up @@ -167,7 +175,6 @@ public void deleteConfig(@PathParam("name") String name, @Context HttpHeaders he

@POST
@Consumes({MediaType.WILDCARD})
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Reload configuration parameters from the database")
@ApiImplicitParams({
@ApiImplicitParam(name = "Authorization", dataType = "string", paramType = "header")})
Expand Down Expand Up @@ -233,6 +240,69 @@ public HashMap<String, String> getEnv( @QueryParam("keys") List<String> keys){
}


@GET
@Path("/appconfig/key/{key}")
@Consumes(MediaType.TEXT_PLAIN)
@Produces({MediaType.TEXT_PLAIN})
@ApiOperation(value = "Retrieves the App Config for the key ")
public String getAppConfig(@PathParam("key") String key, @Context HttpHeaders headers) {

if (StringUtils.isEmpty(key)){
return null;
}
try {
return appConfig.getValue(key);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@PUT
@Path("/appconfig/key/{key}/value/{value}")
@ApiOperation(value = "Adds a new config value to the database")
public void setAppConfig(@PathParam("key") String key, @PathParam("value") String value, @Context HttpHeaders headers) {
try {
appConfig.setValue(key, value);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@DELETE
@Path("/appconfig/key/{key}")
@ApiOperation(value = "Delete the App Config for the key")
public void deleteAppConfig(@PathParam("key") String key, @Context HttpHeaders headers) {
try {
appConfig.removeConfig(key);
} catch (Exception e) {
throw new RuntimeException(e);
}
}


@GET
@Path("/appconfig/list")
@ApiOperation(value = "Get the list of all application configs from the database")
public Map<String, String> getAppConfigs(@Context HttpHeaders headers) {
try {
return appConfig.getConfigs();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/appconfig/refresh")
@ApiOperation(value = "Refresh the cache with list of App Configs from the database")
public void refreshAppConfig(@Context HttpHeaders headers) {
try {
appConfig.reloadProperties("");
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

private boolean bypassAuth(HttpHeaders headers) {
if (!auth_referer_bypass)
return false;
Expand Down
Loading

0 comments on commit 041a587

Please sign in to comment.