Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,15 @@ public synchronized long getRetainedSize()
return INSTANCE_SIZE + estimatedSizeOf(writers, ExchangeStorageWriter::getRetainedSize);
}

@GuardedBy("this")
private void setupWriterForNextPart()
{
currentWriter = exchangeStorage.createExchangeStorageWriter(
outputDirectory.resolve(partitionId + "_" + writers.size() + DATA_FILE_SUFFIX), secretKey);
writers.add(currentWriter);
}

@GuardedBy("this")
private void writeInternal(Slice slice)
{
int position = 0;
Expand All @@ -359,6 +361,7 @@ private void writeInternal(Slice slice)
}
}

@GuardedBy("this")
private void flushIfNeeded(boolean finished)
{
SliceOutput buffer = currentBuffer;
Expand Down Expand Up @@ -410,9 +413,7 @@ public synchronized CompletableFuture<Void> isBlocked()
}
return blockedFuture;
}
else {
return NOT_BLOCKED;
}
return NOT_BLOCKED;
}

public synchronized SliceOutput take()
Expand Down Expand Up @@ -456,9 +457,7 @@ public synchronized long getRetainedSize()
if (closed) {
return INSTANCE_SIZE;
}
else {
return INSTANCE_SIZE + numBuffers * bufferRetainedSize;
}
return INSTANCE_SIZE + numBuffers * bufferRetainedSize;
}

public void close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ public S3FileSystemExchangeStorage(S3FileSystemExchangeStorageStats stats, Excha

@Override
public void createDirectories(URI dir)
throws IOException
{
// Nothing to do for S3
}
Expand Down Expand Up @@ -245,48 +244,52 @@ public ListenableFuture<Void> createEmptyFile(URI file)
public ListenableFuture<Void> deleteRecursively(List<URI> directories)
{
if (compatibilityMode == GCP) {
// GCS is not compatible with S3's multi-object delete API https://cloud.google.com/storage/docs/migrating#methods-comparison
Storage storage = gcsClient.orElseThrow(() -> new IllegalStateException("gcsClient is expected to be initialized"));
ListeningExecutorService deleteExecutor = gcsDeleteExecutor.orElseThrow(() -> new IllegalStateException("gcsDeleteExecutor is expected to be initialized"));
return stats.getDeleteRecursively().record(asVoid(deleteExecutor.submit(() -> {
StorageBatch batch = storage.batch();
for (URI dir : directories) {
Page<Blob> blobs = storage.list(getBucketName(dir), Storage.BlobListOption.prefix(keyFromUri(dir)));
for (Blob blob : blobs.iterateAll()) {
batch.delete(blob.getBlobId());
}
}
batch.submit();
})));
return deleteRecursivelyGcp(directories);
}

ImmutableMultimap.Builder<String, ListenableFuture<List<String>>> bucketToListObjectsFuturesBuilder = ImmutableMultimap.builder();
for (URI dir : directories) {
ImmutableList.Builder<String> keys = ImmutableList.builder();
ListenableFuture<List<String>> listObjectsFuture = Futures.transform(
toListenableFuture((listObjectsRecursively(dir)
.subscribe(listObjectsV2Response -> listObjectsV2Response.contents().stream()
.map(S3Object::key)
.forEach(keys::add)))),
ignored -> keys.build(),
directExecutor());
bucketToListObjectsFuturesBuilder.put(getBucketName(dir), listObjectsFuture);
}
Multimap<String, ListenableFuture<List<String>>> bucketToListObjectsFutures = bucketToListObjectsFuturesBuilder.build();

ImmutableList.Builder<ListenableFuture<List<DeleteObjectsResponse>>> deleteObjectsFutures = ImmutableList.builder();
for (String bucketName : bucketToListObjectsFutures.keySet()) {
deleteObjectsFutures.add(Futures.transformAsync(
Futures.allAsList(bucketToListObjectsFutures.get(bucketName)),
keys -> deleteObjects(
bucketName,
keys.stream()
.flatMap(Collection::stream)
.collect(toImmutableList())),
directExecutor()));
}
else {
ImmutableMultimap.Builder<String, ListenableFuture<List<String>>> bucketToListObjectsFuturesBuilder = ImmutableMultimap.builder();
return translateFailures(Futures.allAsList(deleteObjectsFutures.build()));
}

private ListenableFuture<Void> deleteRecursivelyGcp(List<URI> directories)
{
// GCS is not compatible with S3's multi-object delete API https://cloud.google.com/storage/docs/migrating#methods-comparison
Storage storage = gcsClient.orElseThrow(() -> new IllegalStateException("gcsClient is expected to be initialized"));
ListeningExecutorService deleteExecutor = gcsDeleteExecutor.orElseThrow(() -> new IllegalStateException("gcsDeleteExecutor is expected to be initialized"));
return stats.getDeleteRecursively().record(translateFailures(deleteExecutor.submit(() -> {
StorageBatch batch = storage.batch();
for (URI dir : directories) {
ImmutableList.Builder<String> keys = ImmutableList.builder();
ListenableFuture<List<String>> listObjectsFuture = Futures.transform(
toListenableFuture((listObjectsRecursively(dir)
.subscribe(listObjectsV2Response -> listObjectsV2Response.contents().stream()
.map(S3Object::key)
.forEach(keys::add)))),
ignored -> keys.build(),
directExecutor());
bucketToListObjectsFuturesBuilder.put(getBucketName(dir), listObjectsFuture);
}
Multimap<String, ListenableFuture<List<String>>> bucketToListObjectsFutures = bucketToListObjectsFuturesBuilder.build();

ImmutableList.Builder<ListenableFuture<List<DeleteObjectsResponse>>> deleteObjectsFutures = ImmutableList.builder();
for (String bucketName : bucketToListObjectsFutures.keySet()) {
deleteObjectsFutures.add(Futures.transformAsync(
Futures.allAsList(bucketToListObjectsFutures.get(bucketName)),
keys -> deleteObjects(
bucketName,
keys.stream()
.flatMap(Collection::stream)
.collect(toImmutableList())),
directExecutor()));
Page<Blob> blobs = storage.list(getBucketName(dir), Storage.BlobListOption.prefix(keyFromUri(dir)));
for (Blob blob : blobs.iterateAll()) {
batch.delete(blob.getBlobId());
}
}
return translateFailures(Futures.allAsList(deleteObjectsFutures.build()));
}
batch.submit();
})));
}

@Override
Expand Down Expand Up @@ -651,6 +654,7 @@ public synchronized void close()
inProgressReadFuture = immediateVoidFuture(); // such that we don't retain reference to the buffer
}

@GuardedBy("this")
private void fillBuffer()
{
if (currentFile == null || fileOffset == currentFile.getFileSize()) {
Expand Down