Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public DeleteResult delete(OperationPurpose purpose) throws IOException {

@Override
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
blobStore.deleteBlobsIgnoringIfNotExists(purpose, new Iterator<>() {
blobStore.deleteBlobs(purpose, new Iterator<>() {
@Override
public boolean hasNext() {
return blobNames.hasNext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,7 @@ public DeleteResult deleteBlobDirectory(OperationPurpose purpose, String path) t
return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
}

@Override
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) {
if (blobNames.hasNext() == false) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testRetriesAndOperationsAreTrackedSeparately() throws IOException {
false
);
case LIST_BLOBS -> blobStore.listBlobsByPrefix(purpose, randomIdentifier(), randomIdentifier());
case BLOB_BATCH -> blobStore.deleteBlobsIgnoringIfNotExists(
case BLOB_BATCH -> blobStore.deleteBlobs(
purpose,
List.of(randomIdentifier(), randomIdentifier(), randomIdentifier()).iterator()
);
Expand Down Expand Up @@ -113,7 +113,7 @@ public void testOperationPurposeIsReflectedInBlobStoreStats() throws IOException
os.flush();
});
// BLOB_BATCH
blobStore.deleteBlobsIgnoringIfNotExists(purpose, List.of(randomIdentifier(), randomIdentifier(), randomIdentifier()).iterator());
blobStore.deleteBlobs(purpose, List.of(randomIdentifier(), randomIdentifier(), randomIdentifier()).iterator());

Map<String, BlobStoreActionStats> stats = blobStore.stats();
String statsMapString = stats.toString();
Expand Down Expand Up @@ -148,10 +148,7 @@ public void testOperationPurposeIsNotReflectedInBlobStoreStatsWhenNotServerless(
os.flush();
});
// BLOB_BATCH
blobStore.deleteBlobsIgnoringIfNotExists(
purpose,
List.of(randomIdentifier(), randomIdentifier(), randomIdentifier()).iterator()
);
blobStore.deleteBlobs(purpose, List.of(randomIdentifier(), randomIdentifier(), randomIdentifier()).iterator());
}

Map<String, BlobStoreActionStats> stats = blobStore.stats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesRefe

@Override
public DeleteResult delete(OperationPurpose purpose) throws IOException {
return blobStore.deleteDirectory(purpose, path().buildAsString());
return blobStore.deleteDirectory(path().buildAsString());
}

@Override
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
blobStore.deleteBlobsIgnoringIfNotExists(purpose, new Iterator<>() {
blobStore.deleteBlobs(new Iterator<>() {
@Override
public boolean hasNext() {
return blobNames.hasNext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.BlobStoreActionStats;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.blobstore.OptionalBytesReference;
import org.elasticsearch.common.blobstore.support.BlobContainerUtils;
import org.elasticsearch.common.blobstore.support.BlobMetadata;
Expand Down Expand Up @@ -491,18 +490,17 @@ private void writeBlobMultipart(BlobInfo blobInfo, byte[] buffer, int offset, in
/**
* Deletes the given path and all its children.
*
* @param purpose The purpose of the delete operation
* @param pathStr Name of path to delete
*/
DeleteResult deleteDirectory(OperationPurpose purpose, String pathStr) throws IOException {
DeleteResult deleteDirectory(String pathStr) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> {
DeleteResult deleteResult = DeleteResult.ZERO;
Page<Blob> page = client().list(bucketName, BlobListOption.prefix(pathStr));
do {
final AtomicLong blobsDeleted = new AtomicLong(0L);
final AtomicLong bytesDeleted = new AtomicLong(0L);
final Iterator<Blob> blobs = page.getValues().iterator();
deleteBlobsIgnoringIfNotExists(purpose, new Iterator<>() {
deleteBlobs(new Iterator<>() {
@Override
public boolean hasNext() {
return blobs.hasNext();
Expand All @@ -526,11 +524,9 @@ public String next() {
/**
* Deletes multiple blobs from the specific bucket using a batch request
*
* @param purpose the purpose of the delete operation
* @param blobNames names of the blobs to delete
*/
@Override
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
void deleteBlobs(Iterator<String> blobNames) throws IOException {
if (blobNames.hasNext() == false) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,10 @@ public DeleteResult delete(OperationPurpose purpose) throws IOException {
return summary.getKey();
});
if (list.isTruncated()) {
blobStore.deleteBlobsIgnoringIfNotExists(purpose, blobNameIterator);
blobStore.deleteBlobs(purpose, blobNameIterator);
prevListing = list;
} else {
blobStore.deleteBlobsIgnoringIfNotExists(purpose, Iterators.concat(blobNameIterator, Iterators.single(keyPath)));
blobStore.deleteBlobs(purpose, Iterators.concat(blobNameIterator, Iterators.single(keyPath)));
break;
}
}
Expand All @@ -357,7 +357,7 @@ public DeleteResult delete(OperationPurpose purpose) throws IOException {

@Override
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
blobStore.deleteBlobsIgnoringIfNotExists(purpose, Iterators.map(blobNames, this::buildKey));
blobStore.deleteBlobs(purpose, Iterators.map(blobNames, this::buildKey));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,7 @@ public BlobContainer blobContainer(BlobPath path) {
return new S3BlobContainer(path, this);
}

@Override
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
if (blobNames.hasNext() == false) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.blobstore.url.http.HttpURLBlobContainer;
import org.elasticsearch.common.blobstore.url.http.URLHttpClient;
import org.elasticsearch.common.blobstore.url.http.URLHttpClientSettings;
Expand All @@ -23,10 +22,8 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.CheckedFunction;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Iterator;
import java.util.List;

/**
Expand Down Expand Up @@ -109,11 +106,6 @@ public BlobContainer blobContainer(BlobPath blobPath) {
}
}

@Override
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
throw new UnsupportedOperationException("Bulk deletes are not supported in URL repositories");
}

@Override
public void close() {
// nothing to do here...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.OperationPurpose;

import java.io.IOException;
import java.util.Iterator;

final class HdfsBlobStore implements BlobStore {

Expand Down Expand Up @@ -72,11 +70,6 @@ public BlobContainer blobContainer(BlobPath path) {
return new HdfsBlobContainer(path, this, buildHdfsPath(path), bufferSize, securityContext, replicationFactor);
}

@Override
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
throw new UnsupportedOperationException("Bulk deletes are not supported in Hdfs repositories");
}

private Path buildHdfsPath(BlobPath blobPath) {
final Path path = translateToHdfsPath(blobPath);
if (readOnly == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ public void testSnapshotAndRestore() throws Exception {
testSnapshotAndRestore(false);
}

@Override
public void testBlobStoreBulkDeletion() throws Exception {
// HDFS does not implement bulk deletion from different BlobContainers
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(HdfsPlugin.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -136,11 +135,6 @@ public BlobContainer blobContainer(BlobPath path) {
return new AssertingBlobContainer(delegateBlobStore.blobContainer(path));
}

@Override
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
delegateBlobStore.deleteBlobsIgnoringIfNotExists(purpose, blobNames);
}

@Override
public void close() throws IOException {
delegateBlobStore.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
package org.elasticsearch.common.blobstore;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;

/**
Expand All @@ -28,14 +26,6 @@ public interface BlobStore extends Closeable {
*/
BlobContainer blobContainer(BlobPath path);

/**
* Delete all the provided blobs from the blob store. Each blob could belong to a different {@code BlobContainer}
*
* @param purpose the purpose of the delete operation
* @param blobNames the blobs to be deleted
*/
void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException;

/**
* Returns statistics on the count of operations that have been performed on this blob store
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO

@Override
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
blobStore.deleteBlobsIgnoringIfNotExists(purpose, Iterators.map(blobNames, blobName -> path.resolve(blobName).toString()));
blobStore.deleteBlobs(Iterators.map(blobNames, blobName -> path.resolve(blobName).toString()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.core.IOUtils;

import java.io.IOException;
Expand Down Expand Up @@ -70,8 +69,7 @@ public BlobContainer blobContainer(BlobPath path) {
return new FsBlobContainer(this, path, f);
}

@Override
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
void deleteBlobs(Iterator<String> blobNames) throws IOException {
IOException ioe = null;
long suppressedExceptions = 0;
while (blobNames.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -100,11 +99,6 @@ public BlobContainer blobContainer(BlobPath path) {
return new ConcurrencyLimitingBlobContainer(delegate.blobContainer(path), activeIndices, countDownLatch);
}

@Override
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
delegate.deleteBlobsIgnoringIfNotExists(purpose, blobNames);
}

@Override
public void close() throws IOException {
delegate.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;

class LatencySimulatingBlobStoreRepository extends FsRepository {

Expand Down Expand Up @@ -53,11 +52,6 @@ public BlobContainer blobContainer(BlobPath path) {
return new LatencySimulatingBlobContainer(blobContainer);
}

@Override
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
fsBlobStore.deleteBlobsIgnoringIfNotExists(purpose, blobNames);
}

@Override
public void close() throws IOException {
fsBlobStore.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -70,7 +69,6 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -524,39 +522,6 @@ public void testIndicesDeletedFromRepository() throws Exception {
assertAcked(clusterAdmin().prepareDeleteSnapshot(TEST_REQUEST_TIMEOUT, repoName, "test-snap2").get());
}

public void testBlobStoreBulkDeletion() throws Exception {
Map<BlobPath, List<String>> expectedBlobsPerContainer = new HashMap<>();
try (BlobStore store = newBlobStore()) {
List<String> blobsToDelete = new ArrayList<>();
int numberOfContainers = randomIntBetween(2, 5);
for (int i = 0; i < numberOfContainers; i++) {
BlobPath containerPath = BlobPath.EMPTY.add(randomIdentifier());
final BlobContainer container = store.blobContainer(containerPath);
int numberOfBlobsPerContainer = randomIntBetween(5, 10);
for (int j = 0; j < numberOfBlobsPerContainer; j++) {
byte[] bytes = randomBytes(randomInt(100));
String blobName = randomAlphaOfLength(10);
container.writeBlob(randomPurpose(), blobName, new BytesArray(bytes), false);
if (randomBoolean()) {
blobsToDelete.add(containerPath.buildAsString() + blobName);
} else {
expectedBlobsPerContainer.computeIfAbsent(containerPath, unused -> new ArrayList<>()).add(blobName);
}
}
}

store.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobsToDelete.iterator());
for (var containerEntry : expectedBlobsPerContainer.entrySet()) {
BlobContainer blobContainer = store.blobContainer(containerEntry.getKey());
Map<String, BlobMetadata> blobsInContainer = blobContainer.listBlobs(randomPurpose());
for (String expectedBlob : containerEntry.getValue()) {
assertThat(blobsInContainer, hasKey(expectedBlob));
}
blobContainer.delete(randomPurpose());
}
}
}

public void testDanglingShardLevelBlobCleanup() throws Exception {
final var repoName = createRepository(randomRepositoryName());
final var client = client();
Expand Down
Loading