Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
899c790
100 continue support
anmolanmol1234 Mar 27, 2023
b9fa9b5
accountThrottling
anmolanmol1234 May 15, 2023
1bc98ce
100 continue
anmolanmol1234 Mar 28, 2023
0159117
Code fixes
anmolanmol1234 May 16, 2023
d64c280
Update checkstyle-suppressions.xml
anmolanmol1234 May 17, 2023
66ac70f
Mkdir with blob GFS
anmolanmol1234 May 25, 2023
688d33c
Test fix
anmolanmol1234 May 25, 2023
80527bf
Test fix
anmolanmol1234 May 25, 2023
051bd75
Read API support on blob endpoint
sreeb-msft May 25, 2023
2a5a7b7
Revert get file status change
anmolanmol1234 May 26, 2023
1dcf54b
Not needed in this PR
anmolanmol1234 May 26, 2023
f42fedd
Merge branch 'ABFS_3.3.2_dev' of https://github.com/ABFSDriver/AbfsHa…
sreeb-msft May 29, 2023
42f3cf6
AddingFallback to dfs for read
sreeb-msft May 29, 2023
9761c63
Mkdir PR comments
anmolanmol1234 May 29, 2023
5846d0b
Get-set XAttr Over Blob Endpoint
May 16, 2023
39c873d
Fixed SAS Delegator Typo
May 16, 2023
baac744
Enhanced Tests
May 16, 2023
04f5fdf
Addressing Comments
May 22, 2023
df8d464
Addressed Comments
May 22, 2023
843348f
Addressing Comments
May 24, 2023
d97efb1
Added javadoc
May 24, 2023
2e4dc07
Removed Unused Imports
May 24, 2023
0bc69e4
GetFileStatus on blob endpoint
May 26, 2023
80511d8
Added tests for getFileStatusOnBlobEndpoint
May 26, 2023
7b237aa
Fix getFileSystem on implicit path on blob endpoint
May 26, 2023
4e008d5
Added Tests For GetFileStatus Implicit/Explicit
May 28, 2023
5920b61
Modified tests for unicode chars in get-set attributes code
May 29, 2023
668c3b2
Fixed Failing Tests
May 29, 2023
e5200a0
getXAttr() on implicit path
May 29, 2023
cee90ef
Address Commnets
May 29, 2023
897a34b
Metadata fallback
sreeb-msft May 30, 2023
d19f99a
Added Fallback For API calls
May 30, 2023
97b6587
Fix spacing issues
anmolanmol1234 May 30, 2023
da06a9f
Refactoring test
anmolanmol1234 May 30, 2023
c504459
Merge branch 'ABFS_3.3.2_dev' into mkdirGFS
anmolanmol1234 May 30, 2023
643755c
Refactoring test
anmolanmol1234 May 30, 2023
791f9da
Merge branch 'mkdirGFS' of https://github.com/ABFSDriver/AbfsHadoop i…
anmolanmol1234 May 30, 2023
8a57e91
handling for implicit dir
sreeb-msft May 30, 2023
dc07ff7
Revert Bak Fallback
May 30, 2023
ddd842f
Merge pull request #53 from ABFSDriver/getsetPropBlob
anujmodi2021 May 30, 2023
912f258
Merge pull request #57 from ABFSDriver/mkdirGFS
anmolanmol1234 May 30, 2023
6f8ed8a
Merge branch 'ABFS_3.3.2_dev' into 100continuesupport
anmolanmol1234 May 30, 2023
5b69780
Added tests
sreeb-msft May 30, 2023
7c96cce
Merge branch 'ABFS_3.3.2_dev' into readBlobEndpointSupport
sreeb-msft May 30, 2023
a078b8e
Fix spacing issue
anmolanmol1234 May 30, 2023
b0b855c
Merge branch '100continuesupport' of https://github.com/ABFSDriver/Ab…
anmolanmol1234 May 30, 2023
efac78c
PR comments
anmolanmol1234 May 30, 2023
b2f496b
Fiz for failing test on dfs endpoint
May 31, 2023
b558a4c
Merge pull request #54 from ABFSDriver/100continuesupport
anmolanmol1234 May 31, 2023
a32f774
Handling for 404 in store
sreeb-msft May 31, 2023
b8b6a7d
Merge branch 'readBlobEndpointSupport' of https://github.com/ABFSDriv…
sreeb-msft May 31, 2023
2ce5484
Update user agent
sreeb-msft May 31, 2023
481f2ac
Merge pull request #59 from ABFSDriver/testFix
anujmodi2021 May 31, 2023
cffb436
Merge pull request #60 from ABFSDriver/userAgentUpdate
anmolanmol1234 May 31, 2023
f19e95a
Removed setter
sreeb-msft May 31, 2023
d204dd6
Throwing exception directly
sreeb-msft May 31, 2023
20713a2
Merge branch 'ABFS_3.3.2_dev' of https://github.com/ABFSDriver/AbfsHa…
sreeb-msft May 31, 2023
141ac49
Move weakreferencemap in utils
anmolanmol1234 May 31, 2023
cfe9e4d
remove utils
anmolanmol1234 May 31, 2023
eb246d9
Merge pull request #56 from ABFSDriver/readBlobEndpointSupport
sreeb-msft May 31, 2023
6e0767c
renaming TestAbfsClient
sreeb-msft May 31, 2023
2048210
Merge pull request #63 from ABFSDriver/renameTestAbfsClient
anmolanmol1234 May 31, 2023
28faec1
Merge pull request #61 from ABFSDriver/100continuesupport
anmolanmol1234 May 31, 2023
a01318a
renameBlobExecutorService to not have availableProcessor amount of th…
saxenapranav Jun 6, 2023
bbf03cc
no consumer lag; take from queue size
saxenapranav Jun 6, 2023
13c531e
executorService for each rename. Kill executorService once rename is …
saxenapranav Jun 6, 2023
3774ddd
lease hierarchy WIP
saxenapranav Jun 6, 2023
484ee17
process of renew added; ABfssDfsLease child of AbfsLease; AbfsLease i…
saxenapranav Jun 6, 2023
b08bdc6
AbfsBlobLease to be child of AbfsLease
saxenapranav Jun 6, 2023
e676aa2
isLeaseOnCreateNonRecursive on config
saxenapranav Jun 6, 2023
40be44e
no spawn of thread for non-infite lease acquire
saxenapranav Jun 7, 2023
a9547df
seconds duration, javadoc, test fix
saxenapranav Jun 7, 2023
26b9958
fix tests in ItestListBlobProducer
saxenapranav Jun 7, 2023
7994196
have config checks in createNonRecursive test
saxenapranav Jun 7, 2023
5db2faa
Merge branch 'ABFS_3.3.2_dev' into ABFS_3.3.2_dev_rename_improvements…
saxenapranav Jun 7, 2023
8e6b9cf
test fix in ITestAzureBlobFileSystemLease
saxenapranav Jun 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,11 @@
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
<suppress checks="ParameterNumber|VisibilityModifier"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]ITestSmallWriteOptimization.java"/>
<suppress checks="VisibilityModifier"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]ITestAbfsRestOperation.java"/>
<!-- allow tests to use _ for ordering. -->
<suppress checks="MethodName"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]commit[\\/]ITestAbfsTerasort.java"/>
<suppress checks="ParameterNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]TestAbfsOutputStream.java"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentati
LOG.debug("Page blob directories: {}", setToString(pageBlobDirs));

// User-agent
userAgentId = "wasbdriverV2.1";
userAgentId = "wasbdriverV2.2";

// Extract the directories that should contain block blobs with compaction
blockBlobWithCompationDirs = getDirectorySet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.lang.reflect.Field;

import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.services.PrefixMode;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -119,6 +120,15 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
private boolean optimizeFooterRead;

@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED,
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED)
private boolean isExpectHeaderEnabled;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED,
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED)
private boolean accountThrottlingEnabled;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE,
MinValue = MIN_BUFFER_SIZE,
MaxValue = MAX_BUFFER_SIZE,
Expand Down Expand Up @@ -246,7 +256,7 @@ public class AbfsConfiguration{
private int readAheadQueueDepth;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD,
DefaultValue = 0)
DefaultValue = DEFAULT_FS_AZURE_BLOB_RENAME_THREAD)
private int blobDirRenameMaxThread;

@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_BLOB_COPY_PROGRESS_POLL_WAIT_MILLIS,
Expand Down Expand Up @@ -275,6 +285,14 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
private boolean enableAutoThrottling;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT,
DefaultValue = DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS)
private int accountOperationIdleTimeout;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ANALYSIS_PERIOD,
DefaultValue = DEFAULT_ANALYSIS_PERIOD_MS)
private int analysisPeriod;

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_USER_AGENT_PREFIX_KEY,
DefaultValue = DEFAULT_FS_AZURE_USER_AGENT_PREFIX)
private String userAgentId;
Expand Down Expand Up @@ -326,8 +344,12 @@ public class AbfsConfiguration{
FS_AZURE_ENABLE_ABFS_LIST_ITERATOR, DefaultValue = DEFAULT_ENABLE_ABFS_LIST_ITERATOR)
private boolean enableAbfsListIterator;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_MAX_CONSUMER_LAG, DefaultValue = DEFAULT_FS_AZURE_MAX_CONSUMER_LAG)
private int maximumConsumerLag;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, DefaultValue = DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE)
private int producerQueueMaxSize;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey=FS_AZURE_LEASE_CREATE_NON_RECURSIVE, DefaultValue = DEFAULT_FS_AZURE_LEASE_CREATE_NON_RECURSIVE)
private boolean leaseOnCreateNonRecursive;

public AbfsConfiguration(final Configuration rawConfig, String accountName)
throws IllegalAccessException, InvalidConfigurationValueException, IOException {
Expand Down Expand Up @@ -408,6 +430,11 @@ public boolean shouldEnableBlobEndPoint() {
DefaultValue = DEFAULT_FS_AZURE_INGRESS_FALLBACK_TO_DFS)
private boolean ingressFallbackToDfs;

@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = FS_AZURE_READ_FALLBACK_TO_DFS,
DefaultValue = DEFAULT_AZURE_READ_FALLBACK_TO_DFS)
private boolean readFallbackToDfs;

public boolean shouldMkdirFallbackToDfs() {
return mkdirFallbackToDfs;
}
Expand All @@ -416,6 +443,10 @@ public boolean shouldIngressFallbackToDfs() {
return ingressFallbackToDfs;
}

public boolean shouldReadFallbackToDfs() {
return readFallbackToDfs;
}

/**
* Gets the Azure Storage account name corresponding to this instance of configuration.
* @return the Azure Storage account name
Expand Down Expand Up @@ -768,6 +799,14 @@ public String getAppendBlobDirs() {
return this.azureAppendBlobDirs;
}

public boolean isExpectHeaderEnabled() {
return this.isExpectHeaderEnabled;
}

public boolean accountThrottlingEnabled() {
return accountThrottlingEnabled;
}

public String getAzureInfiniteLeaseDirs() {
return this.azureInfiniteLeaseDirs;
}
Expand Down Expand Up @@ -810,8 +849,16 @@ public boolean isAutoThrottlingEnabled() {
return this.enableAutoThrottling;
}

public int getAccountOperationIdleTimeout() {
return accountOperationIdleTimeout;
}

public int getAnalysisPeriod() {
return analysisPeriod;
}

public String getCustomUserAgentPrefix() {
return "abfsdriverV2.1";
return "abfsdriverV2.2";
}

public String getClusterName() {
Expand Down Expand Up @@ -1184,8 +1231,11 @@ public void setEnableAbfsListIterator(boolean enableAbfsListIterator) {
this.enableAbfsListIterator = enableAbfsListIterator;
}

public int getMaximumConsumerLag() {
return maximumConsumerLag;
public int getProducerQueueMaxSize() {
return producerQueueMaxSize;
}

public boolean isLeaseOnCreateNonRecursive() {
return leaseOnCreateNonRecursive;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.AccessDeniedException;
import java.util.Hashtable;
import java.util.List;
Expand Down Expand Up @@ -58,7 +59,6 @@

import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.classification.InterfaceStability;
Expand Down Expand Up @@ -114,6 +114,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_LEASE_ONE_MINUTE_DURATION;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_BLOB_ENDPOINT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DNS_PREFIX;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.WASB_DNS_PREFIX;
Expand All @@ -126,6 +127,8 @@
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.PATH_EXISTS;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.decodeMetadataAttribute;
import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.encodeMetadataAttribute;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
Expand Down Expand Up @@ -262,7 +265,6 @@ public void initialize(URI uri, Configuration configuration)
}
}

AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled());
boolean isRedirect = abfsConfiguration.isRedirection();
if (isRedirect) {
String abfsUrl = uri.toString();
Expand Down Expand Up @@ -365,7 +367,7 @@ private FSDataInputStream open(final Path path,
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.OPEN, tracingHeaderFormat,
listener);
InputStream inputStream = abfsStore.openFileForRead(qualifiedPath,
InputStream inputStream = getAbfsStore().openFileForRead(qualifiedPath,
options, statistics, tracingContext);
return new FSDataInputStream(inputStream);
} catch(AzureBlobFileSystemException ex) {
Expand Down Expand Up @@ -510,8 +512,10 @@ public FSDataOutputStream createNonRecursive(final Path f, final FsPermission pe
String parentPath = parent.toUri().getPath();
if (getAbfsStore().getAbfsConfiguration().getPrefixMode() == PrefixMode.BLOB
&& getAbfsStore().isAtomicRenameKey(parentPath)) {
abfsBlobLease = new AbfsBlobLease(getAbfsClient(),
parentPath, tracingContext);
if(getAbfsStore().getAbfsConfiguration().isLeaseOnCreateNonRecursive()) {
abfsBlobLease = new AbfsBlobLease(getAbfsClient(),
parentPath, BLOB_LEASE_ONE_MINUTE_DURATION, tracingContext);
}
}
final FileStatus parentFileStatus = tryGetFileStatus(parent, tracingContext);

Expand Down Expand Up @@ -1046,10 +1050,21 @@ private FileStatus getFileStatus(final Path path,
LOG.debug("AzureBlobFileSystem.getFileStatus path: {}", path);
statIncrement(CALL_GET_FILE_STATUS);
Path qualifiedPath = makeQualified(path);
FileStatus fileStatus;

try {
FileStatus fileStatus = abfsStore.getFileStatus(qualifiedPath,
tracingContext);
if (abfsStore.getPrefixMode() == PrefixMode.BLOB) {
/**
* Get File Status over Blob Endpoint will Have an additional call
* to check if directory is implicit.
*/
fileStatus = abfsStore.getFileStatusOverBlob(qualifiedPath,
tracingContext);
}
else {
fileStatus = abfsStore.getFileStatus(qualifiedPath,
tracingContext);
}
if (getAbfsStore().getAbfsConfiguration().getPrefixMode()
== PrefixMode.BLOB && fileStatus != null && fileStatus.isDirectory()
&&
Expand Down Expand Up @@ -1314,13 +1329,30 @@ public void setXAttr(final Path path, final String name, final byte[] value, fin
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.SET_ATTR, true, tracingHeaderFormat,
listener);
Hashtable<String, String> properties = abfsStore
.getPathStatus(qualifiedPath, tracingContext);
Hashtable<String, String> properties;
String xAttrName = ensureValidAttributeName(name);
String xAttrValue;

if (abfsStore.getPrefixMode() == PrefixMode.BLOB) {
properties = abfsStore.getBlobMetadata(qualifiedPath, tracingContext);

boolean xAttrExists = properties.containsKey(xAttrName);
XAttrSetFlag.validate(name, xAttrExists, flag);

// On Blob Endpoint metadata are passed as HTTP Request Headers
// Values in UTF_8 needed to be URL encoded after decoding into String
xAttrValue = encodeMetadataAttribute(new String(value, StandardCharsets.UTF_8));
properties.put(xAttrName, xAttrValue);
abfsStore.setBlobMetadata(qualifiedPath, properties, tracingContext);

return;
}

properties = abfsStore.getPathStatus(qualifiedPath, tracingContext);
boolean xAttrExists = properties.containsKey(xAttrName);
XAttrSetFlag.validate(name, xAttrExists, flag);

String xAttrValue = abfsStore.decodeAttribute(value);
xAttrValue = abfsStore.decodeAttribute(value);
properties.put(xAttrName, xAttrValue);
abfsStore.setPathProperties(qualifiedPath, properties, tracingContext);
} catch (AzureBlobFileSystemException ex) {
Expand Down Expand Up @@ -1354,9 +1386,21 @@ public byte[] getXAttr(final Path path, final String name)
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.GET_ATTR, true, tracingHeaderFormat,
listener);
Hashtable<String, String> properties = abfsStore
.getPathStatus(qualifiedPath, tracingContext);
Hashtable<String, String> properties;
String xAttrName = ensureValidAttributeName(name);

if (abfsStore.getPrefixMode() == PrefixMode.BLOB) {
properties = abfsStore.getBlobMetadata(qualifiedPath, tracingContext);
if (properties.containsKey(xAttrName)) {
String xAttrValue = properties.get(xAttrName);
value = decodeMetadataAttribute(xAttrValue).getBytes(
StandardCharsets.UTF_8);
}
return value;
}

properties = abfsStore.getPathStatus(qualifiedPath, tracingContext);

if (properties.containsKey(xAttrName)) {
String xAttrValue = properties.get(xAttrName);
value = abfsStore.encodeAttribute(xAttrValue);
Expand Down Expand Up @@ -2022,4 +2066,5 @@ public boolean hasPathCapability(final Path path, final String capability)
public IOStatistics getIOStatistics() {
return abfsCounters != null ? abfsCounters.getIOStatistics() : null;
}

}
Loading