Skip to content

Commit 235a6a5

Browse files
committed
Make FileSingleStreamSpiller fields thread safe
FileSingleStreamSpiller fields can be accessed from multiple threads, therefore they need to be thread safe.
1 parent f230413 commit 235a6a5

File tree

1 file changed

+8
-7
lines changed

1 file changed

+8
-7
lines changed

core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
import java.util.Iterator;
4545
import java.util.List;
4646
import java.util.Optional;
47+
import java.util.concurrent.atomic.AtomicBoolean;
48+
import java.util.concurrent.atomic.AtomicLong;
4749

4850
import static com.google.common.base.Preconditions.checkState;
4951
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
@@ -71,8 +73,8 @@ public class FileSingleStreamSpiller
7173

7274
private final ListeningExecutorService executor;
7375

74-
private boolean writable = true;
75-
private long spilledPagesInMemorySize;
76+
private final AtomicBoolean writable = new AtomicBoolean(true);
77+
private final AtomicLong spilledPagesInMemorySize = new AtomicLong();
7678
private ListenableFuture<Void> spillInProgress = immediateVoidFuture();
7779

7880
private final Runnable fileSystemErrorHandler;
@@ -127,7 +129,7 @@ public ListenableFuture<Void> spill(Iterator<Page> pageIterator)
127129
@Override
128130
public long getSpilledPagesInMemorySize()
129131
{
130-
return spilledPagesInMemorySize;
132+
return spilledPagesInMemorySize.longValue();
131133
}
132134

133135
@Override
@@ -145,15 +147,15 @@ public ListenableFuture<List<Page>> getAllSpilledPages()
145147

146148
private void writePages(Iterator<Page> pageIterator)
147149
{
148-
checkState(writable, "Spilling no longer allowed. The spiller has been made non-writable on first read for subsequent reads to be consistent");
150+
checkState(writable.get(), "Spilling no longer allowed. The spiller has been made non-writable on first read for subsequent reads to be consistent");
149151

150152
Optional<SecretKey> encryptionKey = this.encryptionKey;
151153
checkState(encrypted == encryptionKey.isPresent(), "encryptionKey has been discarded");
152154
PageSerializer serializer = serdeFactory.createSerializer(encryptionKey);
153155
try (SliceOutput output = new OutputStreamSliceOutput(targetFile.newOutputStream(APPEND), BUFFER_SIZE)) {
154156
while (pageIterator.hasNext()) {
155157
Page page = pageIterator.next();
156-
spilledPagesInMemorySize += page.getSizeInBytes();
158+
spilledPagesInMemorySize.addAndGet(page.getSizeInBytes());
157159
Slice serializedPage = serializer.serialize(page);
158160
long pageSize = serializedPage.length();
159161
localSpillContext.updateBytes(pageSize);
@@ -169,8 +171,7 @@ private void writePages(Iterator<Page> pageIterator)
169171

170172
private Iterator<Page> readPages()
171173
{
172-
checkState(writable, "Repeated reads are disallowed to prevent potential resource leaks");
173-
writable = false;
174+
checkState(writable.getAndSet(false), "Repeated reads are disallowed to prevent potential resource leaks");
174175

175176
try {
176177
Optional<SecretKey> encryptionKey = this.encryptionKey;

0 commit comments

Comments
 (0)