-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace caffeine cache with reference to full map #2214
Changes from 38 commits
3b66cc1
72348b6
bc29c9c
652d42a
df17bbe
8cb4138
a020b92
7a77ce2
80bb901
1e95103
7b1ca90
e8af8b1
ee556cd
50e1dee
69ebebc
f7174d3
488b35c
dd24913
5947b22
6a0732b
9373f5e
1517b59
ebaa6ed
b2a1f25
e3e3ea2
f2a2e22
5b2069b
bfbcf36
642ebe1
c23fcaf
76aaa10
14a9ab9
9e7555b
efcd68b
5b97f8b
254d6e4
12d6162
862c22e
3514f4b
9cafb6c
2223fdb
9cdd205
73875d6
6f2849b
e7bc9d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,6 +66,29 @@ public synchronized ScheduledExecutorService get( | |
return service; | ||
} | ||
|
||
public synchronized ScheduledExecutorService getSingleThreaded(String name) { | ||
return getSingleThreaded(name, false); | ||
} | ||
|
||
public synchronized ScheduledExecutorService getSingleThreaded( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like this method is unused? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was following the pattern in the SingularityManagedThreadPoolFactory for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤦 just realzied the one above calls it. Nevermind didn't even see that |
||
String name, | ||
boolean isLeaderOnlyPoller | ||
) { | ||
checkState(!stopped.get(), "already stopped"); | ||
|
||
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor( | ||
new ThreadFactoryBuilder().setNameFormat(name + "-%d").setDaemon(true).build() | ||
); | ||
|
||
if (isLeaderOnlyPoller) { | ||
leaderPollerPools.put(name, service); | ||
} else { | ||
executorPools.put(name, service); | ||
} | ||
|
||
return service; | ||
} | ||
|
||
public void stopLeaderPollers() throws Exception { | ||
if (!leaderStopped.getAndSet(true)) { | ||
long timeoutLeftInMillis = timeoutInMillis; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
package com.hubspot.singularity.data; | ||
|
||
import com.google.inject.Inject; | ||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.ScheduledFuture; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.Function; | ||
import java.util.function.Supplier; | ||
import java.util.stream.Collectors; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class ApiCache<K, V> { | ||
private static final Logger LOG = LoggerFactory.getLogger(ApiCache.class); | ||
|
||
public final boolean isEnabled; | ||
private final AtomicReference<Map<K, V>> zkValues; | ||
private final Supplier<Map<K, V>> supplyMap; | ||
private final int cacheTtl; | ||
private final ScheduledExecutorService executor; | ||
|
||
private ScheduledFuture<?> reloadingFuture; | ||
|
||
@Inject | ||
public ApiCache( | ||
boolean isEnabled, | ||
int cacheTtl, | ||
Supplier<Map<K, V>> supplyMap, | ||
ScheduledExecutorService executor | ||
) { | ||
this.isEnabled = isEnabled; | ||
this.supplyMap = supplyMap; | ||
this.zkValues = new AtomicReference<>(new HashMap<>()); | ||
this.cacheTtl = cacheTtl; | ||
this.executor = executor; | ||
} | ||
|
||
public void startReloader() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this also synchronously perform the first fetch I wonder? Would avoid the case where getAll was incorrectly empty or get(K key) was null |
||
if (isEnabled) { | ||
reloadZkValues(); | ||
reloadingFuture = | ||
executor.scheduleAtFixedRate(this::reloadZkValues, 0, cacheTtl, TimeUnit.SECONDS); | ||
} | ||
} | ||
|
||
public void stopReloader() { | ||
if (reloadingFuture != null) { | ||
reloadingFuture.cancel(true); | ||
} | ||
} | ||
|
||
private void reloadZkValues() { | ||
try { | ||
Map<K, V> newZkValues = supplyMap.get(); | ||
if (!newZkValues.isEmpty()) { | ||
zkValues.set(newZkValues); | ||
} else { | ||
LOG.warn("Empty values on cache reload, keeping old values"); | ||
} | ||
} catch (Exception e) { | ||
LOG.warn("Reloading ApiCache failed: {}", e.getMessage()); | ||
} | ||
} | ||
|
||
public V get(K key) { | ||
V value = this.zkValues.get().get(key); | ||
|
||
if (value == null) { | ||
LOG.debug("ApiCache returned null for {}", key); | ||
} | ||
|
||
return value; | ||
} | ||
|
||
public Map<K, V> getAll() { | ||
Map<K, V> allValues = this.zkValues.get(); | ||
if (allValues.isEmpty()) { | ||
LOG.debug("ApiCache getAll returned empty"); | ||
} else { | ||
LOG.debug("getAll returned {} values", allValues.size()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for this and the debug statement above, maybe these are more like TRACE level lines? Would get pretty noisy given that we could acall these multiple times a second There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made all of the get calls' logs trace level. |
||
} | ||
return allValues; | ||
} | ||
|
||
public Map<K, V> getAll(Collection<K> keys) { | ||
Map<K, V> allValues = this.zkValues.get(); | ||
Map<K, V> filteredValues = keys | ||
.stream() | ||
.filter(allValues::containsKey) | ||
.collect(Collectors.toMap(Function.identity(), allValues::get)); | ||
|
||
if (filteredValues.isEmpty()) { | ||
LOG.debug("ApiCache getAll returned empty for {}", keys); | ||
} else { | ||
LOG.debug( | ||
"getAll returned {} for {} amount requested", | ||
filteredValues.size(), | ||
keys.size() | ||
); | ||
} | ||
|
||
return filteredValues; | ||
} | ||
|
||
public boolean isEnabled() { | ||
return isEnabled; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for structure/usability of the client. Is there a reason you have this at the class level instead of just being an arg on the relevant method(s)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked through the request call usages in Orion and talked to Suruu and we decided it would be easier on Deploy's side to have a class level argument rather than updating all usages. It also makes clean up easier for when we've solved the underlying CuratorFramework issue