-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace caffeine cache with reference to full map #2214
Changes from 28 commits
3b66cc1
72348b6
bc29c9c
652d42a
df17bbe
8cb4138
a020b92
7a77ce2
80bb901
1e95103
7b1ca90
e8af8b1
ee556cd
50e1dee
69ebebc
f7174d3
488b35c
dd24913
5947b22
6a0732b
9373f5e
1517b59
ebaa6ed
b2a1f25
e3e3ea2
f2a2e22
5b2069b
bfbcf36
642ebe1
c23fcaf
76aaa10
14a9ab9
9e7555b
efcd68b
5b97f8b
254d6e4
12d6162
862c22e
3514f4b
9cafb6c
2223fdb
9cdd205
73875d6
6f2849b
e7bc9d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
package com.hubspot.singularity.data; | ||
|
||
import com.google.inject.Inject; | ||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.Function; | ||
import java.util.function.Supplier; | ||
import java.util.stream.Collectors; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class ApiCache<K, V> { | ||
private static final Logger LOG = LoggerFactory.getLogger(ApiCache.class); | ||
|
||
public final boolean isEnabled; | ||
private final AtomicReference<Map<K, V>> zkValues; | ||
private final Supplier<Map<K, V>> supplyMap; | ||
|
||
@Inject | ||
public ApiCache( | ||
boolean isEnabled, | ||
int cacheTtl, | ||
Supplier<Map<K, V>> supplyMap, | ||
ScheduledExecutorService executor | ||
) { | ||
this.isEnabled = isEnabled; | ||
this.supplyMap = supplyMap; | ||
this.zkValues = new AtomicReference<>(new HashMap<>()); | ||
|
||
if (this.isEnabled) { | ||
executor.scheduleAtFixedRate(this::reloadZkValues, 2, cacheTtl, TimeUnit.SECONDS); | ||
} | ||
} | ||
|
||
private void reloadZkValues() { | ||
try { | ||
Map<K, V> newZkValues = supplyMap.get(); | ||
zkValues.set(newZkValues); | ||
} catch (Exception e) { | ||
LOG.warn("Reloading ApiCache failed: {}", e.getMessage()); | ||
} | ||
} | ||
|
||
public V get(K key) { | ||
return this.zkValues.get().get(key); | ||
} | ||
|
||
public Map<K, V> getAll() { | ||
Map<K, V> allValues = this.zkValues.get(); | ||
if (allValues.isEmpty()) { | ||
LOG.debug("ApiCache getAll returned empty"); | ||
} | ||
return allValues; | ||
} | ||
|
||
public Map<K, V> getAll(Collection<K> keys) { | ||
Map<K, V> allValues = this.zkValues.get(); | ||
Map<K, V> filteredValues = keys | ||
.stream() | ||
.filter(allValues::containsKey) | ||
.collect(Collectors.toMap(Function.identity(), allValues::get)); | ||
|
||
if (filteredValues.isEmpty()) { | ||
LOG.debug("ApiCache getAll returned empty for {}", keys); | ||
} | ||
|
||
return filteredValues; | ||
} | ||
|
||
public boolean isEnabled() { | ||
return isEnabled; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
import com.hubspot.singularity.SingularityDeployStatistics; | ||
import com.hubspot.singularity.SingularityDeployUpdate; | ||
import com.hubspot.singularity.SingularityDeployUpdate.DeployEventType; | ||
import com.hubspot.singularity.SingularityManagedScheduledExecutorServiceFactory; | ||
import com.hubspot.singularity.SingularityPendingDeploy; | ||
import com.hubspot.singularity.SingularityRequest; | ||
import com.hubspot.singularity.SingularityRequestDeployState; | ||
|
@@ -70,6 +71,8 @@ public class DeployManager extends CuratorAsyncManager { | |
private static final String DEPLOY_STATISTICS_KEY = "STATISTICS"; | ||
private static final String DEPLOY_RESULT_KEY = "RESULT_STATE"; | ||
|
||
private final ApiCache<String, SingularityRequestDeployState> deployCache; | ||
|
||
@Inject | ||
public DeployManager( | ||
CuratorFramework curator, | ||
|
@@ -85,7 +88,8 @@ public DeployManager( | |
IdTranscoder<SingularityDeployKey> deployKeyTranscoder, | ||
Transcoder<SingularityUpdatePendingDeployRequest> updateRequestTranscoder, | ||
ZkCache<SingularityDeploy> deploysCache, | ||
SingularityLeaderCache leaderCache | ||
SingularityLeaderCache leaderCache, | ||
SingularityManagedScheduledExecutorServiceFactory executorServiceFactory | ||
) { | ||
super(curator, configuration, metricRegistry); | ||
this.singularityEventListener = singularityEventListener; | ||
|
@@ -99,6 +103,13 @@ public DeployManager( | |
this.updateRequestTranscoder = updateRequestTranscoder; | ||
this.deploysCache = deploysCache; | ||
this.leaderCache = leaderCache; | ||
this.deployCache = | ||
new ApiCache<>( | ||
configuration.useApiCacheInDeployManager(), | ||
configuration.getDeployCacheTtl(), | ||
this::fetchAllDeployStates, | ||
executorServiceFactory.get("deploy-api-cache-reloader") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there is a version of this that returns a single threaded executor, which is all we should need for this case There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't have an option for a single threaded ScheduledExecutorService in SingularityManagedScheduledExecutorServiceFactory or SingularityManagedThreadPoolFactory but Executors has newSingleThreadScheduledExecutor that I can add. |
||
); | ||
} | ||
|
||
public List<SingularityDeployKey> getDeployIdsFor(String requestId) { | ||
|
@@ -116,6 +127,11 @@ public List<SingularityDeployKey> getAllDeployIds() { | |
return getChildrenAsIdsForParents("getAllDeployIds", paths, deployKeyTranscoder); | ||
} | ||
|
||
public Map<String, SingularityRequestDeployState> fetchAllDeployStates() { | ||
final List<String> requestIds = getChildren(BY_REQUEST_ROOT); | ||
return fetchDeployStatesByRequestIds(requestIds); | ||
} | ||
|
||
@Timed | ||
public Map<String, SingularityRequestDeployState> getRequestDeployStatesByRequestIds( | ||
Collection<String> requestIds | ||
|
@@ -124,6 +140,15 @@ public Map<String, SingularityRequestDeployState> getRequestDeployStatesByReques | |
return leaderCache.getRequestDeployStateByRequestId(requestIds); | ||
} | ||
|
||
Map<String, SingularityRequestDeployState> deployStatesByRequestIds; | ||
|
||
if (deployCache.isEnabled()) { | ||
deployStatesByRequestIds = deployCache.getAll(requestIds); | ||
if (!deployStatesByRequestIds.isEmpty()) { | ||
return deployStatesByRequestIds; | ||
} | ||
} | ||
|
||
return fetchDeployStatesByRequestIds(requestIds); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
import com.hubspot.singularity.SingularityCreateResult; | ||
import com.hubspot.singularity.SingularityDeleteResult; | ||
import com.hubspot.singularity.SingularityDeployKey; | ||
import com.hubspot.singularity.SingularityManagedScheduledExecutorServiceFactory; | ||
import com.hubspot.singularity.SingularityPendingRequest; | ||
import com.hubspot.singularity.SingularityPendingRequest.PendingType; | ||
import com.hubspot.singularity.SingularityRequest; | ||
|
@@ -32,11 +33,13 @@ | |
import com.hubspot.singularity.expiring.SingularityExpiringScale; | ||
import com.hubspot.singularity.expiring.SingularityExpiringSkipHealthchecks; | ||
import com.hubspot.singularity.scheduler.SingularityLeaderCache; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.Comparator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
import org.apache.curator.framework.CuratorFramework; | ||
import org.apache.curator.utils.ZKPaths; | ||
|
@@ -90,6 +93,7 @@ public class RequestManager extends CuratorAsyncManager { | |
); | ||
|
||
private final Map<Class<? extends SingularityExpiringRequestActionParent<? extends SingularityExpiringRequestParent>>, Transcoder<? extends SingularityExpiringRequestActionParent<? extends SingularityExpiringRequestParent>>> expiringTranscoderMap; | ||
private final ApiCache<String, SingularityRequestWithState> requestsCache; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shoudl the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't update elsewhere yet because I wasn't sure if that's what we wanted to do since we were most concerned by the endpoint to get all requests There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since it all pulls form the same place, and we are constnatly updating everything, I think it'd be worth it to update. I believe that the individual request endpoint was also pretty high up on the usage. Can always be a follow up PR if we want to check how effective this is first too There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added the cache to the getRequests(List requestIds) and the singular getRequest(String requestId), and the singular request has a non-cache flag now for Orion usage |
||
|
||
@Inject | ||
public RequestManager( | ||
|
@@ -108,7 +112,8 @@ public RequestManager( | |
Transcoder<SingularityExpiringSkipHealthchecks> expiringSkipHealthchecksTranscoder, | ||
SingularityWebCache webCache, | ||
SingularityLeaderCache leaderCache, | ||
Transcoder<CrashLoopInfo> crashLoopInfoTranscoder | ||
Transcoder<CrashLoopInfo> crashLoopInfoTranscoder, | ||
SingularityManagedScheduledExecutorServiceFactory scheduledExecutorServiceFactory | ||
) { | ||
super(curator, configuration, metricRegistry); | ||
this.requestTranscoder = requestTranscoder; | ||
|
@@ -133,6 +138,16 @@ public RequestManager( | |
|
||
this.leaderCache = leaderCache; | ||
this.webCache = webCache; | ||
this.requestsCache = | ||
new ApiCache<>( | ||
configuration.useApiCacheInRequestManager(), | ||
configuration.getRequestCacheTtl(), | ||
() -> | ||
fetchRequests() | ||
.stream() | ||
.collect(Collectors.toMap(r -> r.getRequest().getId(), Function.identity())), | ||
scheduledExecutorServiceFactory.get("request-api-cache-reloader") | ||
); | ||
} | ||
|
||
private String getRequestPath(String requestId) { | ||
|
@@ -632,11 +647,22 @@ public List<SingularityRequestWithState> getRequests(boolean useWebCache) { | |
if (useWebCache && webCache.useCachedRequests()) { | ||
return webCache.getRequests(); | ||
} | ||
|
||
if (requestsCache.isEnabled()) { | ||
List<SingularityRequestWithState> requests = new ArrayList<>( | ||
(requestsCache.getAll()).values() | ||
); | ||
if (!requests.isEmpty()) { | ||
return requests; | ||
} | ||
} | ||
|
||
List<SingularityRequestWithState> requests = fetchRequests(); | ||
|
||
if (useWebCache) { | ||
webCache.cacheRequests(requests); | ||
} | ||
|
||
return requests; | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put in a 2 second delay because the reloading thread kept starting before the CuratorFrameworkImpl started
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it make more sense to try and do the first call to these inside some of the Managed classses that run during startup. Then we can:
Thinking something like a start() method that we can call somewhere in SingularityLifecycleManaged