Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -204,38 +204,51 @@ public void registerListener(Listener listener1) {
public FSDataInputStream open(final Path path, final int bufferSize) throws IOException {
LOG.debug("AzureBlobFileSystem.open path: {} bufferSize: {}", path, bufferSize);
// bufferSize is unused.
return open(path, Optional.empty());
return open(path, new OpenFileParameters());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concern on using this way and passing OpenFileParameters param.
When FS#openFile() API is been used, you can see how system create the OpenFileParameters(). There may be some stuff coming in as default there.
eg: bufferSize = fileSystem.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT);

So correct buffer size as per conf will be there in OpenFileParameters also
We might not be using buffer size from the param now. But later its possible to use some new stuff like this.
So better dont create OpenFileParameters () here and pass.
Instead we can have the private method accept Optional.
In open(Path), we can Optional.empty() anyways.

}

private FSDataInputStream open(final Path path,
final Optional<Configuration> options) throws IOException {
final OpenFileParameters parameters) throws IOException {
statIncrement(CALL_OPEN);
Path qualifiedPath = makeQualified(path);

try {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.OPEN, tracingHeaderFormat,
listener);
InputStream inputStream = abfsStore.openFileForRead(qualifiedPath,
options, statistics, tracingContext);
fileSystemId, FSOperationType.OPEN, tracingHeaderFormat, listener);
InputStream inputStream = abfsStore
.openFileForRead(qualifiedPath, parameters, statistics,
tracingContext);
return new FSDataInputStream(inputStream);
} catch(AzureBlobFileSystemException ex) {
checkException(path, ex);
return null;
}
}

/**
* Takes config and other options through
* {@link org.apache.hadoop.fs.impl.OpenFileParameters}. Ensure that
* FileStatus entered is up-to-date, as it will be used to create the
* InputStream (with info such as contentLength, eTag)
* @param path The location of file to be opened
* @param parameters OpenFileParameters instance; can hold FileStatus,
* Configuration, bufferSize and mandatoryKeys
*/
@Override
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
final Path path, final OpenFileParameters parameters) throws IOException {
final Path path, final OpenFileParameters parameters) {
LOG.debug("AzureBlobFileSystem.openFileWithOptions path: {}", path);
Set<String> mandatoryKeys = parameters.getMandatoryKeys();
if (mandatoryKeys == null) {
mandatoryKeys = Collections.emptySet();
}
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
mandatoryKeys,
Collections.emptySet(),
"for " + path);
return LambdaUtils.eval(
new CompletableFuture<>(), () ->
open(path, Optional.of(parameters.getOptions())));
open(path, parameters));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
Expand All @@ -129,6 +130,8 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_PLUS;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION;
Expand Down Expand Up @@ -669,44 +672,59 @@ public void createDirectory(final Path path, final FsPermission permission,

public AbfsInputStream openFileForRead(final Path path,
final FileSystem.Statistics statistics, TracingContext tracingContext)
throws AzureBlobFileSystemException {
return openFileForRead(path, Optional.empty(), statistics, tracingContext);
throws IOException {
return openFileForRead(path, new OpenFileParameters(), statistics,
tracingContext);
}

public AbfsInputStream openFileForRead(final Path path,
final Optional<Configuration> options,
final OpenFileParameters parameters,
final FileSystem.Statistics statistics, TracingContext tracingContext)
throws AzureBlobFileSystemException {
try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", "getPathStatus")) {
throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("openFileForRead",
"getPathStatus")) {
LOG.debug("openFileForRead filesystem: {} path: {}",
client.getFileSystem(),
path);
client.getFileSystem(), path);

String relativePath = getRelativePath(path);
Optional<Configuration> options = Optional.empty();

final AbfsRestOperation op = client
.getPathStatus(relativePath, false, tracingContext);
perfInfo.registerResult(op.getResult());
FileStatus fileStatus = null;
if (parameters != null) {
options = Optional.ofNullable(parameters.getOptions());
fileStatus = parameters.getStatus();
}

final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
String relativePath = getRelativePath(path);
String resourceType, eTag;
long contentLength;
if (fileStatus != null) {
resourceType = fileStatus.isFile() ? FILE : DIRECTORY;
contentLength = fileStatus.getLen();
eTag = ((VersionedFileStatus) fileStatus).getVersion();
} else {
AbfsHttpOperation op = client
.getPathStatus(relativePath, false, tracingContext).getResult();
resourceType = op
.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
contentLength = Long.parseLong(
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
}

if (parseIsDirectory(resourceType)) {
throw new AbfsRestOperationException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a test for this

Copy link
Contributor Author

@sumangala-patki sumangala-patki Sep 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, there is a contract test to verify that a FileNotFound exception is thrown on attempt to open a directory
org.apache.hadoop.fs.contract.AbstractContractOpenTest#testOpenReadDir

AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
"openFileForRead must be used with files and not directories",
null);
AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
"openFileForRead must be used with files and not directories",
null);
}

perfInfo.registerSuccess(true);

// Add statistics for InputStream
return new AbfsInputStream(client, statistics,
relativePath, contentLength,
populateAbfsInputStreamContext(options),
eTag, tracingContext);
return new AbfsInputStream(client, statistics, relativePath,
contentLength, populateAbfsInputStreamContext(options), eTag,
tracingContext);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
Expand Down Expand Up @@ -427,6 +428,15 @@ public AzureBlobFileSystemStore getAbfsStore(final AzureBlobFileSystem fs) {
return fs.getAbfsStore();
}

public AbfsClient getAbfsClient(final AzureBlobFileSystemStore abfsStore) {
return abfsStore.getClient();
}

public void setAbfsClient(AzureBlobFileSystemStore abfsStore,
AbfsClient client) {
abfsStore.setClient(client);
}

public Path makeQualified(Path path) throws java.io.IOException {
return getFileSystem().makeQualified(path);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,39 @@
package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;

import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutionException;

import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.impl.OpenFileParameters;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -192,6 +200,98 @@ public TestAbfsInputStream() throws Exception {
ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD);
}

private void writeBufferToNewFile(Path testFile, byte[] buffer) throws IOException {
AzureBlobFileSystem fs = getFileSystem();
fs.create(testFile);
FSDataOutputStream out = fs.append(testFile);
out.write(buffer);
out.close();
}

private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus,
byte[] buf, AbfsRestOperationType source)
throws IOException, ExecutionException, InterruptedException {
byte[] readBuf = new byte[buf.length];
AzureBlobFileSystem fs = getFileSystem();
FutureDataInputStreamBuilder builder = fs.openFile(path);
builder.withFileStatus(fileStatus);
FSDataInputStream in = builder.build().get();
assertEquals(String.format(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use AssertJ

"Open with fileStatus [from %s result]: Incorrect number of bytes read",
source), buf.length, in.read(readBuf));
assertArrayEquals(String
.format("Open with fileStatus [from %s result]: Incorrect read data",
source), readBuf, buf);
}

private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus,
AzureBlobFileSystemStore abfsStore, AbfsClient mockClient,
AbfsRestOperationType source, TracingContext tracingContext)
throws IOException {

// verify GetPathStatus not invoked when FileStatus is provided
abfsStore.openFileForRead(testFile,
new OpenFileParameters().withStatus(fileStatus), null, tracingContext);
verify(mockClient, times(0).description((String.format(
"FileStatus [from %s result] provided, GetFileStatus should not be invoked",
source)))).getPathStatus(anyString(), anyBoolean(), any(TracingContext.class));

// verify GetPathStatus invoked when FileStatus not provided
abfsStore.openFileForRead(testFile, new OpenFileParameters(), null,
tracingContext);
verify(mockClient, times(1).description(
"GetPathStatus should be invoked when FileStatus not provided"))
.getPathStatus(anyString(), anyBoolean(), any(TracingContext.class));

Mockito.reset(mockClient); //clears invocation count for next test case
}

@Test
public void testOpenFileWithOptions() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
String testFolder = "/testFolder";
Path smallTestFile = new Path(testFolder + "/testFile0");
Path largeTestFile = new Path(testFolder + "/testFile1");
fs.mkdirs(new Path(testFolder));
int readBufferSize = getConfiguration().getReadBufferSize();
byte[] smallBuffer = new byte[5];
byte[] largeBuffer = new byte[readBufferSize + 5];
new Random().nextBytes(smallBuffer);
new Random().nextBytes(largeBuffer);
writeBufferToNewFile(smallTestFile, smallBuffer);
writeBufferToNewFile(largeTestFile, largeBuffer);

FileStatus[] getFileStatusResults = {fs.getFileStatus(smallTestFile),
fs.getFileStatus(largeTestFile)};
FileStatus[] listStatusResults = fs.listStatus(new Path(testFolder));

// open with fileStatus from GetPathStatus
verifyOpenWithProvidedStatus(smallTestFile, getFileStatusResults[0],
smallBuffer, AbfsRestOperationType.GetPathStatus);
verifyOpenWithProvidedStatus(largeTestFile, getFileStatusResults[1],
largeBuffer, AbfsRestOperationType.GetPathStatus);

// open with fileStatus from ListStatus
verifyOpenWithProvidedStatus(smallTestFile, listStatusResults[0], smallBuffer,
AbfsRestOperationType.ListPaths);
verifyOpenWithProvidedStatus(largeTestFile, listStatusResults[1], largeBuffer,
AbfsRestOperationType.ListPaths);

// verify number of GetPathStatus invocations
AzureBlobFileSystemStore abfsStore = getAbfsStore(fs);
AbfsClient mockClient = spy(getAbfsClient(abfsStore));
setAbfsClient(abfsStore, mockClient);
TracingContext tracingContext = getTestTracingContext(fs, false);
checkGetPathStatusCalls(smallTestFile, getFileStatusResults[0],
abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext);
checkGetPathStatusCalls(largeTestFile, getFileStatusResults[1],
abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext);
checkGetPathStatusCalls(smallTestFile, listStatusResults[0],
abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext);
checkGetPathStatusCalls(largeTestFile, listStatusResults[1],
abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext);
}

/**
* This test expects AbfsInputStream to throw the exception that readAhead
* thread received on read. The readAhead thread must be initiated from the
Expand Down