Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.stream.StreamSupport;

import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX;
import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus;
Expand Down Expand Up @@ -390,8 +392,18 @@ public S3AFileStatus next() throws IOException {
status = statusBatchIterator.next();
// We remove from provided list the file status listed by S3 so that
// this does not return duplicate items.
if (providedStatus.remove(status)) {
LOG.debug("Removed the status from provided file status {}", status);

// The provided status is returned as it is assumed to have the better
// metadata (i.e. the eTag and versionId from S3Guard)
Optional<S3AFileStatus> provided =
StreamSupport.stream(providedStatus.spliterator(), false)
.filter(element -> status.equals(element)).findFirst();
if (provided.isPresent()) {
providedStatus.remove(status);
LOG.debug(
"Removed and returned the status from provided file status {}",
status);
return provided.get();
}
} else {
if (providedStatusIterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,14 @@ public void testRenameChangedFile() throws Throwable {
* copyObject() for a rename.
* The split of inconsistent responses between getObjectMetadata() and
* copyObject() is somewhat arbitrary.
* @param metadataCallsExpectedBeforeRetryLoop number of getObjectMetadata
* calls expected before the retry loop
* @return the inconsistencies for (metadata, copy)
*/
private Pair<Integer, Integer> renameInconsistencyCounts() {
int maxInconsistenciesBeforeFailure = TEST_MAX_RETRIES + 2;
private Pair<Integer, Integer> renameInconsistencyCounts(
int metadataCallsExpectedBeforeRetryLoop) {
int maxInconsistenciesBeforeFailure = TEST_MAX_RETRIES
+ metadataCallsExpectedBeforeRetryLoop;
int copyInconsistencyCount = versionCheckingIsOnServer() ? 2 : 0;

int metadataInconsistencyCount =
Expand All @@ -500,7 +504,7 @@ public void testRenameEventuallyConsistentFile() throws Throwable {
// copyObject().
// The split of inconsistent responses between getObjectMetadata() and
// copyObject() is arbitrary.
Pair<Integer, Integer> counts = renameInconsistencyCounts();
Pair<Integer, Integer> counts = renameInconsistencyCounts(1);
int metadataInconsistencyCount = counts.getLeft();
int copyInconsistencyCount = counts.getRight();
final Path testpath1 =
Expand Down Expand Up @@ -529,7 +533,9 @@ public void testRenameEventuallyConsistentFile2() throws Throwable {
AmazonS3 s3ClientSpy = Mockito.spy(fs.getAmazonS3ClientForTesting("mocking"));
fs.setAmazonS3Client(s3ClientSpy);

Pair<Integer, Integer> counts = renameInconsistencyCounts();
skipIfVersionPolicyAndNoVersionId();

Pair<Integer, Integer> counts = renameInconsistencyCounts(1);
int metadataInconsistencyCount = counts.getLeft();
int copyInconsistencyCount = counts.getRight();
final Path testpath2 =
Expand All @@ -538,7 +544,6 @@ public void testRenameEventuallyConsistentFile2() throws Throwable {
0,
metadataInconsistencyCount,
copyInconsistencyCount + 1);
skipIfVersionPolicyAndNoVersionId(testpath2);
final Path dest2 = path("dest2.dat");
if (expectedExceptionInteractions.contains(
InteractionType.EVENTUALLY_CONSISTENT_COPY)) {
Expand Down Expand Up @@ -573,7 +578,7 @@ public void testRenameEventuallyConsistentDirectory() throws Throwable {
writeDataset(fs, consistentFile, TEST_DATA_BYTES, TEST_DATA_BYTES.length,
1024, true, true);

Pair<Integer, Integer> counts = renameInconsistencyCounts();
Pair<Integer, Integer> counts = renameInconsistencyCounts(0);
int metadataInconsistencyCount = counts.getLeft();
int copyInconsistencyCount = counts.getRight();

Expand Down Expand Up @@ -639,6 +644,9 @@ public void testReadAfterEventuallyConsistentWrite2() throws Throwable {
final Path testpath2 =
writeEventuallyConsistentFileVersion("eventually2.dat",
s3ClientSpy, TEST_MAX_RETRIES + 1, 0, 0);

skipIfVersionPolicyAndNoVersionId(testpath2);

try (FSDataInputStream instream2 = fs.open(testpath2)) {
if (expectedExceptionInteractions.contains(
InteractionType.EVENTUALLY_CONSISTENT_READ)) {
Expand Down Expand Up @@ -1012,6 +1020,9 @@ private Path writeFileWithNoVersionMetadata(String filename)
/**
* The test is invalid if the policy uses versionId but the bucket doesn't
* have versioning enabled.
*
* Tests the given file for a versionId to detect whether bucket versioning
* is enabled.
*/
private void skipIfVersionPolicyAndNoVersionId(Path testpath)
throws IOException {
Expand All @@ -1024,6 +1035,19 @@ private void skipIfVersionPolicyAndNoVersionId(Path testpath)
}
}

/**
* Like {@link #skipIfVersionPolicyAndNoVersionId(Path)} but generates a new
* file to test versionId against.
*/
private void skipIfVersionPolicyAndNoVersionId() throws IOException {
if (fs.getChangeDetectionPolicy().getSource() == Source.VersionId) {
Path versionIdFeatureTestFile = path("versionIdTest");
writeDataset(fs, versionIdFeatureTestFile, TEST_DATA_BYTES,
TEST_DATA_BYTES.length, 1024, true, true);
skipIfVersionPolicyAndNoVersionId(versionIdFeatureTestFile);
}
}

private GetObjectRequest matchingGetObjectRequest(Path path, String eTag,
String versionId) {
return ArgumentMatchers.argThat(request -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.s3a.S3AContract;

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import org.junit.Assume;
import org.junit.Test;

Expand Down Expand Up @@ -557,6 +559,44 @@ public void testInconsistentS3ClientDeletes() throws Throwable {
);
}

/**
* Tests that the file's eTag and versionId are preserved in recursive
* listings.
*/
@Test
public void testListingReturnsVersionMetadata() throws Throwable {
S3AFileSystem fs = getFileSystem();
Assume.assumeTrue(fs.hasMetadataStore());

// write simple file
Path file = path("file1");
try (FSDataOutputStream outputStream = fs.create(file)) {
outputStream.writeChars("hello");
}

// get individual file status
FileStatus[] fileStatuses = fs.listStatus(file);
assertEquals(1, fileStatuses.length);
S3AFileStatus status = (S3AFileStatus) fileStatuses[0];
String eTag = status.getETag();
String versionId = status.getVersionId();

// get status through recursive directory listing
RemoteIterator<LocatedFileStatus> filesIterator = fs.listFiles(
file.getParent(), true);
List<LocatedFileStatus> files = Lists.newArrayList();
while (filesIterator.hasNext()) {
files.add(filesIterator.next());
}
assertEquals(1, files.size());

// ensure eTag and versionId are preserved in directory listing
S3ALocatedFileStatus locatedFileStatus =
(S3ALocatedFileStatus) files.get(0);
assertEquals(eTag, locatedFileStatus.getETag());
assertEquals(versionId, locatedFileStatus.getVersionId());
}

/**
* Assert that the two list sizes match; failure message includes the lists.
* @param message text for the assertion
Expand Down