Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Caffeine for weak maps #2601

Merged
merged 20 commits into from
Mar 24, 2021
Merged
38 changes: 11 additions & 27 deletions benchmark/benchmark.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
plugins {
id "me.champeau.gradle.jmh" version "0.5.0"
id "me.champeau.jmh" version "0.6.2"
}

apply from: "$rootDir/gradle/java.gradle"
Expand All @@ -8,6 +8,10 @@ dependencies {
jmh deps.opentelemetryApi
jmh deps.bytebuddyagent

jmh project(':instrumentation-api')

jmh deps.caffeine

jmh 'javax.servlet:javax.servlet-api:4.0.1'
jmh 'com.google.http-client:google-http-client:1.19.0'
jmh 'org.eclipse.jetty:jetty-server:9.4.1.v20170120'
Expand All @@ -18,35 +22,15 @@ dependencies {
}

jmh {
timeUnit = 'ms' // Output time unit. Available time units are: [m, s, ms, us, ns].
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed these since they override annotations. Benchmarks often need specific settings so copy-pasta of annotations is better than global control

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

benchmarkMode = ['avgt']
timeOnIteration = '20s'
iterations = 1 // Number of measurement iterations to do.
fork = 1 // How many times to forks a single benchmark. Use 0 to disable forking altogether
// jvmArgs += ["-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints", "-XX:StartFlightRecording=delay=5s,dumponexit=true,name=jmh-benchmark,filename=$rootDir/benchmark/build/reports/jmh/jmh-benchmark.jfr"]
// jvmArgs += ["-agentpath:$rootDir/benchmark/src/jmh/resources/libasyncProfiler.so=start,collapsed,file=$rootDir/benchmark/build/reports/jmh/profiler.txt"]

// Should JMH fail immediately if any benchmark had experienced the unrecoverable error?
failOnError = true
warmup = '5s' // Time to spend at each warmup iteration.
// warmupBatchSize = 10 // Warmup batch size: number of benchmark method calls per operation.
// How many warmup forks to make for a single benchmark. 0 to disable warmup forks.
warmupForks = 0
warmupIterations = 1 // Number of warmup iterations to do.

// profilers = ['stack:lines=5;detailLine=true;period=5;excludePackages=true']
// Use profilers to collect additional data. Supported profilers: [cl, comp, gc, stack, perf, perfnorm, perfasm, xperf, xperfasm, hs_cl, hs_comp, hs_gc, hs_rt, hs_thr]
profilers = ['io.opentelemetry.benchmark.UsedMemoryProfiler', 'gc']

// humanOutputFile = project.file("${project.buildDir}/reports/jmh/human.txt") // human-readable output file
// operationsPerInvocation = 10 // Operations per invocation.
// synchronizeIterations = false // Synchronize iterations?
timeout = '5s' // Timeout for benchmark iteration.
// includeTests = false
// Allows to include test sources into generate JMH jar, i.e. use it when benchmarks depend on the test classes.

duplicateClassesStrategy = DuplicatesStrategy.EXCLUDE
jmhVersion = '1.23' // Specifies JMH version
jmhVersion = '1.28' // Specifies JMH version

def jmhIncludeSingleClass = project.findProperty('jmhIncludeSingleClass')
if (jmhIncludeSingleClass != null) {
includes = [jmhIncludeSingleClass]
}
}

tasks.jmh.dependsOn(':javaagent:shadowJar')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.benchmark;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.opentelemetry.context.internal.shaded.WeakConcurrentMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

@Fork(3)
@Warmup(iterations = 10, time = 1)
@Measurement(iterations = 5, time = 1)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Thread)
public class WeakMapBenchmark {

private static final WeakConcurrentMap<String, String> weakConcurrentMap =
new WeakConcurrentMap<>(true, true);

private static final WeakConcurrentMap<String, String> weakConcurrentMapInline =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added inline expunction benchmark too. Found very little difference in performance so settled on it since it means not having to worry about cleaner threads which are hard to manage for library instrumentation (we were using a common task executor before)

new WeakConcurrentMap.WithInlinedExpunction<>();

private static final Cache<String, String> caffeineCache =
Caffeine.newBuilder().weakKeys().build();
private static final Map<String, String> caffeineMap = caffeineCache.asMap();

private String key;

@Setup
public void setUp() {
key = new String(Thread.currentThread().getName());
}

@Benchmark
@Threads(1)
public void threads01_weakConcurrentMap(Blackhole blackhole) {
blackhole.consume(weakConcurrentMap.put(key, "foo"));
blackhole.consume(weakConcurrentMap.get(key));
blackhole.consume(weakConcurrentMap.remove(key));
}

@Benchmark
@Threads(5)
public void threads05_weakConcurrentMap(Blackhole blackhole) {
blackhole.consume(weakConcurrentMap.put(key, "foo"));
blackhole.consume(weakConcurrentMap.get(key));
blackhole.consume(weakConcurrentMap.remove(key));
}

@Benchmark
@Threads(10)
public void threads10_weakConcurrentMap(Blackhole blackhole) {
blackhole.consume(weakConcurrentMap.put(key, "foo"));
blackhole.consume(weakConcurrentMap.get(key));
blackhole.consume(weakConcurrentMap.remove(key));
}

@Benchmark
@Threads(1)
public void threads01_weakConcurrentMap_inline(Blackhole blackhole) {
blackhole.consume(weakConcurrentMapInline.put(key, "foo"));
blackhole.consume(weakConcurrentMapInline.get(key));
blackhole.consume(weakConcurrentMapInline.remove(key));
}

@Benchmark
@Threads(5)
public void threads05_weakConcurrentMap_inline(Blackhole blackhole) {
blackhole.consume(weakConcurrentMapInline.put(key, "foo"));
blackhole.consume(weakConcurrentMapInline.get(key));
blackhole.consume(weakConcurrentMapInline.remove(key));
}

@Benchmark
@Threads(10)
public void threads10_weakConcurrentMap_inline(Blackhole blackhole) {
blackhole.consume(weakConcurrentMapInline.put(key, "foo"));
blackhole.consume(weakConcurrentMapInline.get(key));
blackhole.consume(weakConcurrentMapInline.remove(key));
}

@Benchmark
@Threads(1)
public void threads01_caffeine(Blackhole blackhole) {
blackhole.consume(caffeineMap.put(key, "foo"));
blackhole.consume(caffeineMap.get(key));
blackhole.consume(caffeineMap.remove(key));
}

@Benchmark
@Threads(5)
public void threads05_caffeine(Blackhole blackhole) {
blackhole.consume(caffeineMap.put(key, "foo"));
blackhole.consume(caffeineMap.get(key));
blackhole.consume(caffeineMap.remove(key));
}

@Benchmark
@Threads(10)
public void threads10_caffeine(Blackhole blackhole) {
blackhole.consume(caffeineMap.put(key, "foo"));
blackhole.consume(caffeineMap.get(key));
blackhole.consume(caffeineMap.remove(key));
}
}
4 changes: 3 additions & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ ext {
mockito : '3.6.0',
// Caffeine 2.x to support Java 8+. 3.x is 11+.
caffeine : '2.9.0',
testcontainers : '1.15.2'
testcontainers : '1.15.2',
weaklockfree : '0.18'
]

deps = [
Expand Down Expand Up @@ -78,6 +79,7 @@ ext {
dependencies.create(group: 'io.prometheus', name: 'simpleclient_httpserver', version: "${versions.prometheus}"),
],
caffeine : "com.github.ben-manes.caffeine:caffeine:${versions.caffeine}",
weaklockfree : "com.blogspot.mydailyjava:weak-lock-free:${versions.weaklockfree}",

// Testing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ dependencies {
exclude group: 'com.google.errorprone', module: 'error_prone_annotations'
exclude group: 'org.checkerframework', module: 'checker-qual'
}

compileOnly deps.weaklockfree
shadowInclude deps.weaklockfree
}

shadowJar {
Expand All @@ -25,6 +28,7 @@ shadowJar {
archiveClassifier.set("")

relocate "com.github.benmanes.caffeine", "io.opentelemetry.instrumentation.api.internal.shaded.caffeine"
relocate "com.blogspot.mydailyjava.weaklockfree", "io.opentelemetry.instrumentation.api.internal.shaded.weaklockfree"

minimize()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,16 @@ static CacheBuilder newBuilder() {
* computes the value using {@code mappingFunction}, stores the result, and returns it.
*/
V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction);

/**
* Returns the cached value associated with the provided {@code key} if present, or {@code null}
* otherwise.
*/
V get(K key);

/** Puts the {@code value} into the cache for the {@code key}. */
void put(K key, V value);

/** Removes a value for {@code key} if present. */
void remove(K key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
/** A builder of {@link Cache}. */
public final class CacheBuilder {

private final Caffeine<?, ?> caffeine = Caffeine.newBuilder();
private static final long UNSET = -1;

private boolean weakKeys;
private long maximumSize = UNSET;
private Executor executor = null;

/** Sets the maximum size of the cache. */
public CacheBuilder setMaximumSize(long maximumSize) {
caffeine.maximumSize(maximumSize);
this.maximumSize = maximumSize;
return this;
}

Expand All @@ -24,22 +28,35 @@ public CacheBuilder setMaximumSize(long maximumSize) {
* {@link Object#equals(Object)}.
*/
public CacheBuilder setWeakKeys() {
caffeine.weakKeys();
this.weakKeys = true;
return this;
}

// Visible for testing
CacheBuilder setExecutor(Executor executor) {
caffeine.executor(executor);
this.executor = executor;
return this;
}

/** Returns a new {@link Cache} with the settings of this {@link CacheBuilder}. */
public <K, V> Cache<K, V> build() {
if (weakKeys && maximumSize == UNSET) {
return new WeakLockFreeCache<>();
}
Caffeine<?, ?> caffeine = Caffeine.newBuilder();
if (weakKeys) {
caffeine.weakKeys();
}
if (maximumSize != UNSET) {
caffeine.maximumSize(maximumSize);
}
if (executor != null) {
caffeine.executor(executor);
}
@SuppressWarnings("unchecked")
com.github.benmanes.caffeine.cache.Cache<K, V> delegate =
(com.github.benmanes.caffeine.cache.Cache<K, V>) caffeine.build();
return new CaffeineCache<K, V>(delegate);
return new CaffeineCache<>(delegate);
}

CacheBuilder() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,21 @@ public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction
return delegate.get(key, mappingFunction);
}

@Override
public V get(K key) {
return delegate.getIfPresent(key);
}

@Override
public void put(K key, V value) {
delegate.put(key, value);
}

@Override
public void remove(K key) {
delegate.invalidate(key);
}

// Visible for testing
Set<K> keySet() {
return delegate.asMap().keySet();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.caching;

import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap;
import java.util.function.Function;

final class WeakLockFreeCache<K, V> implements Cache<K, V> {

private final WeakConcurrentMap<K, V> delegate;

WeakLockFreeCache() {
this.delegate = new WeakConcurrentMap.WithInlinedExpunction<>();
}

@Override
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
V value = get(key);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation mostly same as WeakMapSuppliers before it, though it's using putIfAbsent so a bit better I think.

if (value != null) {
return value;
}
// Best we can do, we don't expect high contention with this implementation. Note, this
// prevents executing mappingFunction twice but it does not prevent executing mappingFunction
// if there is a concurrent put operation as would be the case for ConcurrentHashMap. However,
// we would never expect an order guarantee in this case anyways so it still has the same
// safety.
synchronized (delegate) {
value = get(key);
if (value != null) {
return value;
}
value = mappingFunction.apply(key);
V previous = delegate.putIfAbsent(key, value);
if (previous != null) {
return previous;
}
return value;
}
}

@Override
public V get(K key) {
return delegate.getIfPresent(key);
}

@Override
public void put(K key, V value) {
delegate.put(key, value);
}

@Override
public void remove(K key) {
delegate.remove(key);
}

// Visible for testing
int size() {
return delegate.approximateSize();
}
}
Loading