Skip to content

Commit

Permalink
Merge pull request #2214 from HubSpot/break_up_caffeine_caching
Browse files Browse the repository at this point in the history
Replace caffeine cache with reference to full map
  • Loading branch information
rosalind210 authored Aug 6, 2021
2 parents 67272af + e7bc9d9 commit 916716d
Show file tree
Hide file tree
Showing 15 changed files with 387 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ public class SingularityClient {

private final Retryer<HttpResponse> httpResponseRetryer;

private boolean skipApiCache = false;

@Inject
@Deprecated
public SingularityClient(
Expand Down Expand Up @@ -411,6 +413,11 @@ public SingularityClient(
.build();
}

@Deprecated
public void setSkipApiCache(boolean skipApiCache) {
this.skipApiCache = skipApiCache;
}

private String getApiBase(String host) {
return String.format(BASE_API_FORMAT, ssl ? "https" : "http", host, contextPath);
}
Expand Down Expand Up @@ -899,6 +906,16 @@ public Optional<SingularityRequestParent> getSingularityRequest(String requestId
final Function<String, String> singularityApiRequestUri = host ->
String.format(REQUEST_GET_FORMAT, getApiBase(host), requestId);

if (skipApiCache) {
return getSingleWithParams(
singularityApiRequestUri,
"request",
requestId,
Optional.of(ImmutableMap.of("skipCache", true)),
SingularityRequestParent.class
);
}

return getSingle(
singularityApiRequestUri,
"request",
Expand Down Expand Up @@ -1239,7 +1256,14 @@ public Collection<SingularityRequestParent> getSingularityRequests(
return getCollectionWithParams(
requestUri,
"[ACTIVE, PAUSED, COOLDOWN] requests",
Optional.of(ImmutableMap.of("includeFullRequestData", includeFullRequestData)),
Optional.of(
ImmutableMap.of(
"includeFullRequestData",
includeFullRequestData,
"skipCache",
skipApiCache
)
),
REQUESTS_COLLECTION
);
}
Expand Down
5 changes: 0 additions & 5 deletions SingularityService/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -510,11 +510,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,29 @@ public synchronized ScheduledExecutorService get(
return service;
}

public synchronized ScheduledExecutorService getSingleThreaded(String name) {
return getSingleThreaded(name, false);
}

public synchronized ScheduledExecutorService getSingleThreaded(
String name,
boolean isLeaderOnlyPoller
) {
checkState(!stopped.get(), "already stopped");

ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(name + "-%d").setDaemon(true).build()
);

if (isLeaderOnlyPoller) {
leaderPollerPools.put(name, service);
} else {
executorPools.put(name, service);
}

return service;
}

public void stopLeaderPollers() throws Exception {
if (!leaderStopped.getAndSet(true)) {
long timeoutLeftInMillis = timeoutInMillis;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package com.hubspot.singularity;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.base.Function;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.hubspot.dropwizard.guicier.DropwizardAwareModule;
import com.hubspot.mesos.client.SingularityMesosClientModule;
import com.hubspot.mesos.client.UserAndPassword;
Expand All @@ -29,14 +26,10 @@
import com.hubspot.singularity.resources.SingularityOpenApiResource;
import com.hubspot.singularity.resources.SingularityResourceModule;
import com.hubspot.singularity.scheduler.SingularitySchedulerModule;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

public class SingularityServiceModule
extends DropwizardAwareModule<SingularityConfiguration> {
public static final String REQUESTS_CAFFEINE_CACHE =
"singularity.service.resources.request";
private final Function<SingularityConfiguration, Module> dbModuleProvider;
private Optional<Class<? extends LoadBalancerClient>> lbClientClass = Optional.empty();

Expand Down Expand Up @@ -135,16 +128,4 @@ public IndexViewConfiguration provideIndexViewConfiguration(
.contains(SingularityAuthenticatorClass.WEBHOOK)
);
}

@Provides
@Singleton
@Named(REQUESTS_CAFFEINE_CACHE)
public Cache<String, List<SingularityRequestParent>> getRequestsCaffeineCache() {
SingularityConfiguration configuration = getConfiguration();

return Caffeine
.newBuilder()
.expireAfterWrite(configuration.getCaffeineCacheTtl(), TimeUnit.SECONDS)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,12 @@ public class SingularityConfiguration extends Configuration {
private double statusQueueNearlyFull = 0.8;

// Enable caffeine cache on heavily requested endpoint
private boolean useCaffeineCache = false;
private boolean useApiCacheInRequestManager = false;
private boolean useApiCacheInDeployManager = false;

// Caffeine cache ttl
private int caffeineCacheTtl = 1;
// Atomic Reference cache TTLs
private int deployCacheTtlInSeconds = 5;
private int requestCacheTtlInSeconds = 5;

public long getAskDriverToKillTasksAgainAfterMillis() {
return askDriverToKillTasksAgainAfterMillis;
Expand Down Expand Up @@ -2076,19 +2078,35 @@ public void setStatusQueueNearlyFull(double statusQueueNearlyFull) {
this.statusQueueNearlyFull = statusQueueNearlyFull;
}

public boolean useCaffeineCache() {
return useCaffeineCache;
public boolean useApiCacheInRequestManager() {
return useApiCacheInRequestManager;
}

public void setUseCaffeineCache(boolean useCaffeineCache) {
this.useCaffeineCache = useCaffeineCache;
public void setUseApiCacheInRequestManager(boolean useApiCacheInRequestManager) {
this.useApiCacheInRequestManager = useApiCacheInRequestManager;
}

public int getCaffeineCacheTtl() {
return caffeineCacheTtl;
public boolean useApiCacheInDeployManager() {
return useApiCacheInDeployManager;
}

public void setCaffeineCacheTtl(int caffeineCacheTtl) {
this.caffeineCacheTtl = caffeineCacheTtl;
public void setUseApiCacheInDeployManager(boolean useApiCacheInDeployManager) {
this.useApiCacheInDeployManager = useApiCacheInDeployManager;
}

public int getDeployCacheTtl() {
return deployCacheTtlInSeconds;
}

public void setDeployCacheTtl(int deployCacheTtlInSeconds) {
this.deployCacheTtlInSeconds = deployCacheTtlInSeconds;
}

public int getRequestCacheTtl() {
return requestCacheTtlInSeconds;
}

public void setRequestCacheTtl(int requestCacheTtlInSeconds) {
this.requestCacheTtlInSeconds = requestCacheTtlInSeconds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package com.hubspot.singularity.data;

import com.google.inject.Inject;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ApiCache<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(ApiCache.class);

private final boolean isEnabled;
private final AtomicReference<Map<K, V>> zkValues;
private final Supplier<Map<K, V>> supplyMap;
private final int cacheTtl;
private final ScheduledExecutorService executor;

private LeaderLatch leaderLatch;

private ScheduledFuture<?> reloadingFuture;

@Inject
public ApiCache(
boolean isEnabled,
int cacheTtl,
Supplier<Map<K, V>> supplyMap,
ScheduledExecutorService executor
) {
this.isEnabled = isEnabled;
this.supplyMap = supplyMap;
this.zkValues = new AtomicReference<>(new HashMap<>());
this.cacheTtl = cacheTtl;
this.executor = executor;
}

public void startReloader(LeaderLatch leaderLatch) {
this.leaderLatch = leaderLatch;
if (isEnabled) {
if (leaderLatch.hasLeadership()) {
LOG.debug("Not doing initial ApiCache load");
} else {
reloadZkValues();
}
reloadingFuture =
executor.scheduleAtFixedRate(this::reloadZkValues, 0, cacheTtl, TimeUnit.SECONDS);
}
}

public void stopReloader() {
if (reloadingFuture != null) {
reloadingFuture.cancel(true);
}
}

private void reloadZkValues() {
if (!leaderLatch.hasLeadership()) {
try {
Map<K, V> newZkValues = supplyMap.get();
if (!newZkValues.isEmpty()) {
zkValues.set(newZkValues);
} else {
LOG.warn("Empty values on cache reload, keeping old values");
}
} catch (Exception e) {
LOG.warn("Reloading ApiCache failed: {}", e.getMessage());
}
} else {
if (!zkValues.get().isEmpty()) {
LOG.debug("Clearing ZK values because instance is leader");
zkValues.get().clear();
}
}
}

public V get(K key) {
V value = this.zkValues.get().get(key);

if (value == null) {
LOG.trace("ApiCache returned null for {}", key);
}

return value;
}

public Map<K, V> getAll() {
Map<K, V> allValues = this.zkValues.get();
if (allValues.isEmpty()) {
LOG.trace("ApiCache getAll returned empty");
} else {
LOG.trace("getAll returned {} values", allValues.size());
}
return allValues;
}

public Map<K, V> getAll(Collection<K> keys) {
Map<K, V> allValues = this.zkValues.get();
Map<K, V> filteredValues = keys
.stream()
.filter(allValues::containsKey)
.collect(Collectors.toMap(Function.identity(), allValues::get));

if (filteredValues.isEmpty()) {
LOG.trace("ApiCache getAll returned empty for {}", keys);
} else {
LOG.trace(
"getAll returned {} for {} amount requested",
filteredValues.size(),
keys.size()
);
}

return filteredValues;
}

public boolean isEnabled() {
return isEnabled;
}
}
Loading

0 comments on commit 916716d

Please sign in to comment.