Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Listing(S3AFileSystem owner) {
* @return the file status iterator
*/
ProvidedFileStatusIterator createProvidedFileStatusIterator(
FileStatus[] fileStatuses,
S3AFileStatus[] fileStatuses,
PathFilter filter,
FileStatusAcceptor acceptor) {
return new ProvidedFileStatusIterator(fileStatuses, filter, acceptor);
Expand Down Expand Up @@ -114,7 +114,7 @@ FileStatusListingIterator createFileStatusListingIterator(
S3ListRequest request,
PathFilter filter,
Listing.FileStatusAcceptor acceptor,
RemoteIterator<FileStatus> providedStatus) throws IOException {
RemoteIterator<S3AFileStatus> providedStatus) throws IOException {
return new FileStatusListingIterator(
new ObjectListingIterator(listPath, request),
filter,
Expand All @@ -129,7 +129,7 @@ FileStatusListingIterator createFileStatusListingIterator(
*/
@VisibleForTesting
LocatedFileStatusIterator createLocatedFileStatusIterator(
RemoteIterator<FileStatus> statusIterator) {
RemoteIterator<S3AFileStatus> statusIterator) {
return new LocatedFileStatusIterator(statusIterator);
}

Expand All @@ -143,7 +143,7 @@ LocatedFileStatusIterator createLocatedFileStatusIterator(
*/
@VisibleForTesting
TombstoneReconcilingIterator createTombstoneReconcilingIterator(
RemoteIterator<LocatedFileStatus> iterator, Set<Path> tombstones) {
RemoteIterator<S3LocatedFileStatus> iterator, Set<Path> tombstones) {
return new TombstoneReconcilingIterator(iterator, tombstones);
}

Expand Down Expand Up @@ -189,19 +189,19 @@ interface FileStatusAcceptor {
* iterator returned.
*/
static final class SingleStatusRemoteIterator
implements RemoteIterator<LocatedFileStatus> {
implements RemoteIterator<S3LocatedFileStatus> {

/**
* The status to return; set to null after the first iteration.
*/
private LocatedFileStatus status;
private S3LocatedFileStatus status;

/**
* Constructor.
* @param status status value: may be null, in which case
* the iterator is empty.
*/
public SingleStatusRemoteIterator(LocatedFileStatus status) {
public SingleStatusRemoteIterator(S3LocatedFileStatus status) {
this.status = status;
}

Expand All @@ -226,9 +226,9 @@ public boolean hasNext() throws IOException {
* to the constructor.
*/
@Override
public LocatedFileStatus next() throws IOException {
public S3LocatedFileStatus next() throws IOException {
if (hasNext()) {
LocatedFileStatus s = this.status;
S3LocatedFileStatus s = this.status;
status = null;
return s;
} else {
Expand All @@ -247,16 +247,16 @@ public LocatedFileStatus next() throws IOException {
* There is no remote data to fetch.
*/
static class ProvidedFileStatusIterator
implements RemoteIterator<FileStatus> {
private final ArrayList<FileStatus> filteredStatusList;
implements RemoteIterator<S3AFileStatus> {
private final ArrayList<S3AFileStatus> filteredStatusList;
private int index = 0;

ProvidedFileStatusIterator(FileStatus[] fileStatuses, PathFilter filter,
ProvidedFileStatusIterator(S3AFileStatus[] fileStatuses, PathFilter filter,
FileStatusAcceptor acceptor) {
Preconditions.checkArgument(fileStatuses != null, "Null status list!");

filteredStatusList = new ArrayList<>(fileStatuses.length);
for (FileStatus status : fileStatuses) {
for (S3AFileStatus status : fileStatuses) {
if (filter.accept(status.getPath()) && acceptor.accept(status)) {
filteredStatusList.add(status);
}
Expand All @@ -270,7 +270,7 @@ public boolean hasNext() throws IOException {
}

@Override
public FileStatus next() throws IOException {
public S3AFileStatus next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
Expand Down Expand Up @@ -305,7 +305,7 @@ public FileStatus next() throws IOException {
* Thread safety: None.
*/
class FileStatusListingIterator
implements RemoteIterator<FileStatus> {
implements RemoteIterator<S3AFileStatus> {

/** Source of objects. */
private final ObjectListingIterator source;
Expand All @@ -316,10 +316,10 @@ class FileStatusListingIterator
/** request batch size. */
private int batchSize;
/** Iterator over the current set of results. */
private ListIterator<FileStatus> statusBatchIterator;
private ListIterator<S3AFileStatus> statusBatchIterator;

private final Set<FileStatus> providedStatus;
private Iterator<FileStatus> providedStatusIterator;
private final Set<S3AFileStatus> providedStatus;
private Iterator<S3AFileStatus> providedStatusIterator;

/**
* Create an iterator over file status entries.
Expand All @@ -335,13 +335,13 @@ class FileStatusListingIterator
FileStatusListingIterator(ObjectListingIterator source,
PathFilter filter,
FileStatusAcceptor acceptor,
RemoteIterator<FileStatus> providedStatus) throws IOException {
RemoteIterator<S3AFileStatus> providedStatus) throws IOException {
this.source = source;
this.filter = filter;
this.acceptor = acceptor;
this.providedStatus = new HashSet<>();
for (; providedStatus != null && providedStatus.hasNext();) {
final FileStatus status = providedStatus.next();
final S3AFileStatus status = providedStatus.next();
if (filter.accept(status.getPath()) && acceptor.accept(status)) {
this.providedStatus.add(status);
}
Expand Down Expand Up @@ -384,8 +384,8 @@ private boolean sourceHasNext() throws IOException {

@Override
@Retries.RetryTranslated
public FileStatus next() throws IOException {
final FileStatus status;
public S3AFileStatus next() throws IOException {
final S3AFileStatus status;
if (sourceHasNext()) {
status = statusBatchIterator.next();
// We remove from provided list the file status listed by S3 so that
Expand Down Expand Up @@ -441,7 +441,7 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
// counters for debug logs
int added = 0, ignored = 0;
// list to fill in with results. Initial size will be list maximum.
List<FileStatus> stats = new ArrayList<>(
List<S3AFileStatus> stats = new ArrayList<>(
objects.getObjectSummaries().size() +
objects.getCommonPrefixes().size());
// objects
Expand All @@ -453,8 +453,9 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
}
// Skip over keys that are ourselves and old S3N _$folder$ files
if (acceptor.accept(keyPath, summary) && filter.accept(keyPath)) {
FileStatus status = createFileStatus(keyPath, summary,
owner.getDefaultBlockSize(keyPath), owner.getUsername());
S3AFileStatus status = createFileStatus(keyPath, summary,
owner.getDefaultBlockSize(keyPath), owner.getUsername(),
null, null);
LOG.debug("Adding: {}", status);
stats.add(status);
added++;
Expand All @@ -468,7 +469,7 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
for (String prefix : objects.getCommonPrefixes()) {
Path keyPath = owner.keyToQualifiedPath(prefix);
if (acceptor.accept(keyPath, prefix) && filter.accept(keyPath)) {
FileStatus status = new S3AFileStatus(Tristate.FALSE, keyPath,
S3AFileStatus status = new S3AFileStatus(Tristate.FALSE, keyPath,
owner.getUsername());
LOG.debug("Adding directory: {}", status);
added++;
Expand Down Expand Up @@ -679,14 +680,14 @@ public boolean accept(FileStatus status) {
* return a remote iterator of {@link LocatedFileStatus} instances.
*/
class LocatedFileStatusIterator
implements RemoteIterator<LocatedFileStatus> {
private final RemoteIterator<FileStatus> statusIterator;
implements RemoteIterator<S3LocatedFileStatus> {
private final RemoteIterator<S3AFileStatus> statusIterator;

/**
* Constructor.
* @param statusIterator an iterator over the remote status entries
*/
LocatedFileStatusIterator(RemoteIterator<FileStatus> statusIterator) {
LocatedFileStatusIterator(RemoteIterator<S3AFileStatus> statusIterator) {
this.statusIterator = statusIterator;
}

Expand All @@ -696,7 +697,7 @@ public boolean hasNext() throws IOException {
}

@Override
public LocatedFileStatus next() throws IOException {
public S3LocatedFileStatus next() throws IOException {
return owner.toLocatedFileStatus(statusIterator.next());
}
}
Expand All @@ -708,16 +709,16 @@ public LocatedFileStatus next() throws IOException {
* remain in the source iterator.
*/
static class TombstoneReconcilingIterator implements
RemoteIterator<LocatedFileStatus> {
private LocatedFileStatus next = null;
private final RemoteIterator<LocatedFileStatus> iterator;
RemoteIterator<S3LocatedFileStatus> {
private S3LocatedFileStatus next = null;
private final RemoteIterator<S3LocatedFileStatus> iterator;
private final Set<Path> tombstones;

/**
* @param iterator Source iterator to filter
* @param tombstones set of tombstone markers to filter out of results
*/
TombstoneReconcilingIterator(RemoteIterator<LocatedFileStatus>
TombstoneReconcilingIterator(RemoteIterator<S3LocatedFileStatus>
iterator, Set<Path> tombstones) {
this.iterator = iterator;
if (tombstones != null) {
Expand All @@ -729,7 +730,7 @@ static class TombstoneReconcilingIterator implements

private boolean fetch() throws IOException {
while (next == null && iterator.hasNext()) {
LocatedFileStatus candidate = iterator.next();
S3LocatedFileStatus candidate = iterator.next();
if (!tombstones.contains(candidate.getPath())) {
next = candidate;
return true;
Expand All @@ -745,9 +746,9 @@ public boolean hasNext() throws IOException {
return fetch();
}

public LocatedFileStatus next() throws IOException {
public S3LocatedFileStatus next() throws IOException {
if (hasNext()) {
LocatedFileStatus result = next;
S3LocatedFileStatus result = next;
next = null;
fetch();
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
@InterfaceStability.Unstable
public class RemoteFileChangedException extends PathIOException {

public static final String PRECONDITIONS_NOT_MET =
"Constraints of request were unsatisfiable";

/**
* Constructs a RemoteFileChangedException.
*
Expand All @@ -46,4 +49,21 @@ public RemoteFileChangedException(String path,
super(path, message);
setOperation(operation);
}

/**
* Constructs a RemoteFileChangedException.
*
* @param path the path accessed when the change was detected
* @param operation the operation (e.g. open, re-open) performed when the
* change was detected
* @param message a message providing more details about the condition
* @param cause inner cause.
*/
public RemoteFileChangedException(String path,
String operation,
String message,
Throwable cause) {
super(path, message, cause);
setOperation(operation);
}
}
Loading