Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ interface PackageName {
String APPSMITH_AI_PLUGIN = "appsmithai-plugin";
String DATABRICKS_PLUGIN = "databricks-plugin";
String AWS_LAMBDA_PLUGIN = "aws-lambda-plugin";
String MONGO_PLUGIN = "mongo-plugin";
}

public static final String DEFAULT_REST_DATASOURCE = "DEFAULT_REST_DATASOURCE";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.appsmith.server.domains;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.time.Instant;

@Getter
@Setter
@ToString
public class DatasourcePluginContext<T> {
private T connection;
private String pluginId;
private Instant creationTime;

public DatasourcePluginContext() {
creationTime = Instant.now();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@
import com.appsmith.server.datasourcestorages.base.DatasourceStorageService;
import com.appsmith.server.domains.DatasourceContext;
import com.appsmith.server.domains.DatasourceContextIdentifier;
import com.appsmith.server.domains.DatasourcePluginContext;
import com.appsmith.server.domains.Plugin;
import com.appsmith.server.exceptions.AppsmithError;
import com.appsmith.server.exceptions.AppsmithException;
import com.appsmith.server.helpers.PluginExecutorHelper;
import com.appsmith.server.plugins.base.PluginService;
import com.appsmith.server.services.ConfigService;
import com.appsmith.server.solutions.DatasourcePermission;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
Expand All @@ -29,15 +34,34 @@
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;

@Slf4j
public class DatasourceContextServiceCEImpl implements DatasourceContextServiceCE {

// DatasourceContextIdentifier contains datasourceId & environmentId which is mapped to DatasourceContext
protected final Map<DatasourceContextIdentifier, Mono<DatasourceContext<Object>>> datasourceContextMonoMap;
protected final Map<DatasourceContextIdentifier, Object> datasourceContextSynchronizationMonitorMap;
protected final Map<DatasourceContextIdentifier, DatasourceContext<?>> datasourceContextMap;

/**
* This cache is used to store the datasource context for a limited time and a limited max number of connections and
* then destroy the least recently used connection. The cleanup process is triggered when the cache is accessed and
* either the time limit or the max connections are reached.
* The purpose of this is to prevent the large number of open dangling connections to the movies mockDB.
* The removalListener method is called when the connection is removed from the cache.
*/
protected final Cache<DatasourceContextIdentifier, DatasourcePluginContext> datasourcePluginContextMapLRUCache =
CacheBuilder.newBuilder()
.removalListener(createRemovalListener())
.expireAfterAccess(2, TimeUnit.HOURS)
.maximumSize(300) // caches most recently used 300 mock connections per pod
.build();

private final DatasourceService datasourceService;
private final DatasourceStorageService datasourceStorageService;
private final PluginService pluginService;
Expand Down Expand Up @@ -67,6 +91,50 @@ public DatasourceContextServiceCEImpl(
this.datasourcePermission = datasourcePermission;
}

private RemovalListener<DatasourceContextIdentifier, DatasourcePluginContext> createRemovalListener() {
return (RemovalNotification<DatasourceContextIdentifier, DatasourcePluginContext> removalNotification) -> {
handleRemoval(removalNotification);
};
}

private Object getConnectionFromDatasourceContextMap(DatasourceContextIdentifier datasourceContextIdentifier) {
return this.datasourceContextMap.containsKey(datasourceContextIdentifier)
&& this.datasourceContextMap.get(datasourceContextIdentifier) != null
? this.datasourceContextMap.get(datasourceContextIdentifier).getConnection()
: null;
}

private void handleRemoval(
RemovalNotification<DatasourceContextIdentifier, DatasourcePluginContext> removalNotification) {
final DatasourceContextIdentifier datasourceContextIdentifier = removalNotification.getKey();
final DatasourcePluginContext datasourcePluginContext = removalNotification.getValue();

log.debug(
"Removing Datasource Context from cache and closing the open connection for DatasourceId: {} and environmentId: {}",
datasourceContextIdentifier.getDatasourceId(),
datasourceContextIdentifier.getEnvironmentId());
log.info("LRU Cache Size after eviction: {}", datasourcePluginContextMapLRUCache.size());

// Close connection and remove entry from both cache maps
final Object connection = getConnectionFromDatasourceContextMap(datasourceContextIdentifier);

Mono<Plugin> pluginMono =
pluginService.findById(datasourcePluginContext.getPluginId()).cache();
if (connection != null) {
pluginExecutorHelper
.getPluginExecutor(pluginMono)
.flatMap(pluginExecutor -> Mono.fromRunnable(() -> pluginExecutor.datasourceDestroy(connection)))
.onErrorResume(e -> {
log.error("Error destroying stale datasource connection", e);
return Mono.empty();
})
.subscribe(); // Trigger the execution
}
// Remove the entries from both maps
datasourceContextMonoMap.remove(datasourceContextIdentifier);
datasourceContextMap.remove(datasourceContextIdentifier);
}

/**
* This method defines a critical section that can be executed only by one thread at a time per datasource id - i
* .e. if two threads want to create datasource for different datasource ids then they would not be synchronized.
Expand Down Expand Up @@ -115,6 +183,11 @@ public Mono<DatasourceContext<Object>> getCachedDatasourceContextMono(
}
datasourceContextMonoMap.remove(datasourceContextIdentifier);
datasourceContextMap.remove(datasourceContextIdentifier);
log.info(
"Invalidating the LRU cache entry for datasource id {}, environment id {} as the connection is stale or in error state",
datasourceContextIdentifier.getDatasourceId(),
datasourceContextIdentifier.getEnvironmentId());
datasourcePluginContextMapLRUCache.invalidate(datasourceContextIdentifier);
}

/*
Expand All @@ -129,17 +202,13 @@ public Mono<DatasourceContext<Object>> getCachedDatasourceContextMono(
+ ": Cached resource context mono exists for datasource id {}, environment id {}. Returning the same.",
datasourceContextIdentifier.getDatasourceId(),
datasourceContextIdentifier.getEnvironmentId());
// Accessing the LRU cache to update the last accessed time
datasourcePluginContextMapLRUCache.getIfPresent(datasourceContextIdentifier);
return datasourceContextMonoMap.get(datasourceContextIdentifier);
}

/* Create a fresh datasource context */
DatasourceContext<Object> datasourceContext = new DatasourceContext<>();
if (datasourceContextIdentifier.isKeyValid() && shouldCacheContextForThisPlugin(plugin)) {
/* For this datasource, either the context doesn't exist, or the context is stale. Replace (or add) with
the new connection in the context map. */
datasourceContextMap.put(datasourceContextIdentifier, datasourceContext);
}

Mono<Object> connectionMonoCache = pluginExecutor
.datasourceCreate(datasourceStorage.getDatasourceConfiguration())
.cache();
Expand All @@ -159,15 +228,34 @@ public Mono<DatasourceContext<Object>> getCachedDatasourceContextMono(
datasourceContext)
.cache(); /* Cache the value so that further evaluations don't result in new connections */

if (datasourceContextIdentifier.isKeyValid() && shouldCacheContextForThisPlugin(plugin)) {
datasourceContextMonoMap.put(datasourceContextIdentifier, datasourceContextMonoCache);
}
log.debug(
Thread.currentThread().getName()
+ ": Cached new datasource context for datasource id {}, environment id {}",
datasourceContextIdentifier.getDatasourceId(),
datasourceContextIdentifier.getEnvironmentId());
return datasourceContextMonoCache;
return connectionMonoCache
.flatMap(connection -> {
datasourceContext.setConnection(connection);
if (datasourceContextIdentifier.isKeyValid()
&& shouldCacheContextForThisPlugin(plugin)) {
datasourceContextMap.put(datasourceContextIdentifier, datasourceContext);
datasourceContextMonoMap.put(
datasourceContextIdentifier, datasourceContextMonoCache);

if (TRUE.equals(datasourceStorage.getIsMock())
&& PluginConstants.PackageName.MONGO_PLUGIN.equals(
plugin.getPackageName())) {
log.info(
"Datasource is a mock mongo DB. Adding the connection to LRU cache!");
DatasourcePluginContext<Object> datasourcePluginContext =
new DatasourcePluginContext<>();
datasourcePluginContext.setConnection(datasourceContext.getConnection());
datasourcePluginContext.setPluginId(plugin.getId());
datasourcePluginContextMapLRUCache.put(
datasourceContextIdentifier, datasourcePluginContext);
log.info(
"LRU Cache Size after adding: {}",
datasourcePluginContextMapLRUCache.size());
}
}
return datasourceContextMonoCache;
})
.switchIfEmpty(datasourceContextMonoCache);
}
})
.flatMap(obj -> obj)
Expand Down Expand Up @@ -195,7 +283,7 @@ public Mono<Object> updateDatasourceAndSetAuthentication(Object connection, Data
.setAuthentication(updatableConnection.getAuthenticationDTO(
datasourceStorage.getDatasourceConfiguration().getAuthentication()));
datasourceStorageMono = datasourceStorageService.updateDatasourceStorage(
datasourceStorage, datasourceStorage.getEnvironmentId(), Boolean.FALSE, false);
datasourceStorage, datasourceStorage.getEnvironmentId(), FALSE, false);
}
return datasourceStorageMono.thenReturn(connection);
}
Expand Down Expand Up @@ -308,6 +396,8 @@ public Mono<DatasourceContext<?>> getDatasourceContext(DatasourceStorage datasou
} else {
if (isValidDatasourceContextAvailable(datasourceStorage, datasourceContextIdentifier)) {
log.debug("Resource context exists. Returning the same.");
// Accessing the LRU cache to update the last accessed time
datasourcePluginContextMapLRUCache.getIfPresent(datasourceContextIdentifier);
return Mono.just(datasourceContextMap.get(datasourceContextIdentifier));
}
}
Expand Down Expand Up @@ -399,7 +489,11 @@ public Mono<DatasourceContext<?>> deleteDatasourceContext(DatasourceStorage data
log.info("Clearing datasource context for datasource storage ID {}.", datasourceStorage.getId());
pluginExecutor.datasourceDestroy(datasourceContext.getConnection());
datasourceContextMonoMap.remove(datasourceContextIdentifier);

log.info(
"Invalidating the LRU cache entry for datasource id {}, environment id {} as delete datasource context is invoked",
datasourceContextIdentifier.getDatasourceId(),
datasourceContextIdentifier.getEnvironmentId());
datasourcePluginContextMapLRUCache.invalidate(datasourceContextIdentifier);
if (!datasourceContextMap.containsKey(datasourceContextIdentifier)) {
log.info(
"datasourceContextMap does not contain any entry for datasource storage with id: {} ",
Expand Down