Skip to content

Commit 6c5a3ee

Browse files
coeuvrecopybara-github
authored andcommitted
Remote: Add AsyncTaskCache which is used to deduplicate task executions and cache the results.
PiperOrigin-RevId: 361979747
1 parent c789d78 commit 6c5a3ee

File tree

2 files changed

+477
-0
lines changed

2 files changed

+477
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
// Copyright 2021 The Bazel Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package com.google.devtools.build.lib.remote.util;
15+
16+
import com.google.common.base.Preconditions;
17+
import com.google.common.collect.ImmutableSet;
18+
import io.reactivex.rxjava3.core.Completable;
19+
import io.reactivex.rxjava3.core.Observable;
20+
import io.reactivex.rxjava3.core.Single;
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
import java.util.Optional;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import javax.annotation.concurrent.GuardedBy;
26+
import javax.annotation.concurrent.ThreadSafe;
27+
28+
/**
29+
* A cache which de-duplicates the executions and stores the results of asynchronous tasks. Each
30+
* task is identified by a key of type {@link KeyT} and has the result of type {@link ValueT}.
31+
*
32+
* <p>Use {@link #executeIfNot} or {@link #execute} and subscribe the returned {@link Single} to
33+
* start executing a task. The {@link Single} turns to completed once the task is {@code finished}.
34+
* Errors are propagated if any.
35+
*
36+
* <p>Calling {@code execute[IfNot]} multiple times with the same task key can get an {@link Single}
37+
* which connects to the same underlying execution if the task is still executing, or get a
38+
* completed {@link Single} if the task is already finished. Set {@code force} to {@code true } to
39+
* re-execute a finished task.
40+
*
41+
* <p>Dispose the {@link Single} to cancel to task execution.
42+
*/
43+
@ThreadSafe
44+
public final class AsyncTaskCache<KeyT, ValueT> {
45+
@GuardedBy("this")
46+
private final Map<KeyT, ValueT> finished;
47+
48+
@GuardedBy("this")
49+
private final Map<KeyT, Observable<ValueT>> inProgress;
50+
51+
public static <KeyT, ValueT> AsyncTaskCache<KeyT, ValueT> create() {
52+
return new AsyncTaskCache<>();
53+
}
54+
55+
private AsyncTaskCache() {
56+
this.finished = new HashMap<>();
57+
this.inProgress = new HashMap<>();
58+
}
59+
60+
/** Returns a set of keys for tasks which is finished. */
61+
public ImmutableSet<KeyT> getFinishedTasks() {
62+
synchronized (this) {
63+
return ImmutableSet.copyOf(finished.keySet());
64+
}
65+
}
66+
67+
/** Returns a set of keys for tasks which is still executing. */
68+
public ImmutableSet<KeyT> getInProgressTasks() {
69+
synchronized (this) {
70+
return ImmutableSet.copyOf(inProgress.keySet());
71+
}
72+
}
73+
74+
/**
75+
* Executes a task if it hasn't been executed.
76+
*
77+
* @param key identifies the task.
78+
* @return a {@link Single} which turns to completed once the task is finished or propagates the
79+
* error if any.
80+
*/
81+
public Single<ValueT> executeIfNot(KeyT key, Single<ValueT> task) {
82+
return execute(key, task, false);
83+
}
84+
85+
/**
86+
* Executes a task.
87+
*
88+
* @param key identifies the task.
89+
* @param force re-execute a finished task if set to {@code true}.
90+
* @return a {@link Single} which turns to completed once the task is finished or propagates the
91+
* error if any.
92+
*/
93+
public Single<ValueT> execute(KeyT key, Single<ValueT> task, boolean force) {
94+
return Single.defer(
95+
() -> {
96+
synchronized (this) {
97+
if (!force && finished.containsKey(key)) {
98+
return Single.just(finished.get(key));
99+
}
100+
101+
finished.remove(key);
102+
103+
Observable<ValueT> execution =
104+
inProgress.computeIfAbsent(
105+
key,
106+
missingKey -> {
107+
AtomicInteger subscribeTimes = new AtomicInteger(0);
108+
return Single.defer(
109+
() -> {
110+
int times = subscribeTimes.incrementAndGet();
111+
Preconditions.checkState(
112+
times == 1, "Subscribed more than once to the task");
113+
return task;
114+
})
115+
.doOnSuccess(
116+
value -> {
117+
synchronized (this) {
118+
finished.put(key, value);
119+
inProgress.remove(key);
120+
}
121+
})
122+
.doOnError(
123+
error -> {
124+
synchronized (this) {
125+
inProgress.remove(key);
126+
}
127+
})
128+
.doOnDispose(
129+
() -> {
130+
synchronized (this) {
131+
inProgress.remove(key);
132+
}
133+
})
134+
.toObservable()
135+
.publish()
136+
.refCount();
137+
});
138+
139+
return Single.fromObservable(execution);
140+
}
141+
});
142+
}
143+
144+
/** An {@link AsyncTaskCache} without result. */
145+
public static final class NoResult<KeyT> {
146+
private final AsyncTaskCache<KeyT, Optional<Void>> cache;
147+
148+
public static <KeyT> AsyncTaskCache.NoResult<KeyT> create() {
149+
return new AsyncTaskCache.NoResult<>(AsyncTaskCache.create());
150+
}
151+
152+
public NoResult(AsyncTaskCache<KeyT, Optional<Void>> cache) {
153+
this.cache = cache;
154+
}
155+
156+
/** Same as {@link AsyncTaskCache#executeIfNot} but operates on {@link Completable}. */
157+
public Completable executeIfNot(KeyT key, Completable task) {
158+
return Completable.fromSingle(
159+
cache.executeIfNot(key, task.toSingleDefault(Optional.empty())));
160+
}
161+
162+
/** Same as {@link AsyncTaskCache#executeIfNot} but operates on {@link Completable}. */
163+
public Completable execute(KeyT key, Completable task, boolean force) {
164+
return Completable.fromSingle(
165+
cache.execute(key, task.toSingleDefault(Optional.empty()), force));
166+
}
167+
168+
/** Returns a set of keys for tasks which is finished. */
169+
public ImmutableSet<KeyT> getFinishedTasks() {
170+
return cache.getFinishedTasks();
171+
}
172+
173+
/** Returns a set of keys for tasks which is still executing. */
174+
public ImmutableSet<KeyT> getInProgressTasks() {
175+
return cache.getInProgressTasks();
176+
}
177+
}
178+
}

0 commit comments

Comments
 (0)