-
Notifications
You must be signed in to change notification settings - Fork 0
Producer, consumer design for having listing and renaming in parallel. #52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f5b8fdd
0acee48
c02bc3e
2e4372d
b63dd44
9d1a271
295be5a
b66bee8
8ab8cf0
cea438e
05505c5
e7625c8
f222b80
4c7b31e
d624e87
de9d5cb
0b49c60
9b59f66
ba226ed
2d38311
8235c0e
2360293
7c5d520
0ba5fd4
dd187da
9065343
afa734e
04c1803
ec53bad
2e1a929
4fae0a7
63a099e
a01318a
bbf03cc
13c531e
3774ddd
484ee17
b08bdc6
e676aa2
40be44e
a9547df
26b9958
7994196
5db2faa
8e6b9cf
6744352
0d316c4
043944a
31b665e
443d416
e6718f8
81f1d43
06cf503
7d85a89
cf08e12
89b0ace
2ab43e3
a5420f4
c9f9c29
0eb5a94
85f393a
0903356
c5ae899
137eca9
3cc8c69
a47a508
8de26ce
51e7e5a
14050c1
7430b1e
aafc099
21e4e26
b8b1ef0
b5ec70d
3e8207e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,6 +44,7 @@ | |
| import java.util.concurrent.Future; | ||
|
|
||
| import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; | ||
| import org.apache.hadoop.fs.azurebfs.services.AbfsBlobLease; | ||
| import org.apache.hadoop.fs.azurebfs.services.BlobProperty; | ||
| import org.apache.hadoop.classification.VisibleForTesting; | ||
| import org.apache.hadoop.fs.azurebfs.services.OperativeEndpoint; | ||
|
|
@@ -114,6 +115,7 @@ | |
| import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL; | ||
| import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT; | ||
| import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*; | ||
| import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_LEASE_ONE_MINUTE_DURATION; | ||
| import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_BLOB_ENDPOINT; | ||
| import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DNS_PREFIX; | ||
| import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.WASB_DNS_PREFIX; | ||
|
|
@@ -424,6 +426,29 @@ private boolean shouldRedirect(FSOperationType type, TracingContext context) | |
| @Override | ||
| public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, | ||
| final short replication, final long blockSize, final Progressable progress) throws IOException { | ||
| return create(f, permission, overwrite, bufferSize, replication, blockSize, | ||
| progress, false); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a file in the file system with the specified parameters. | ||
| * @param f the path of the file to create | ||
| * @param permission the permission of the file | ||
| * @param overwrite whether to overwrite the existing file if any | ||
| * @param bufferSize the size of the buffer to be used | ||
| * @param replication the number of replicas for the file | ||
| * @param blockSize the size of the block for the file | ||
| * @param progress the progress indicator for the file creation | ||
| * @param blobParentDirPresentChecked whether the presence of parent directory | ||
| * been checked | ||
| * @return a FSDataOutputStream object that can be used to write to the file | ||
| * @throws IOException if an error occurs while creating the file | ||
| */ | ||
| private FSDataOutputStream create(final Path f, | ||
| final FsPermission permission, | ||
| final boolean overwrite, final int bufferSize, | ||
| final short replication, | ||
| final long blockSize, final Progressable progress, final Boolean blobParentDirPresentChecked) throws IOException { | ||
| LOG.debug("AzureBlobFileSystem.create path: {} permission: {} overwrite: {} bufferSize: {}", | ||
| f, | ||
| permission, | ||
|
|
@@ -449,9 +474,11 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi | |
|
|
||
| if (prefixMode == PrefixMode.BLOB) { | ||
| validatePathOrSubPathDoesNotExist(qualifiedPath, tracingContext); | ||
| Path parent = qualifiedPath.getParent(); | ||
| if (parent != null && !parent.isRoot()) { | ||
| if (!blobParentDirPresentChecked) { | ||
| Path parent = qualifiedPath.getParent(); | ||
| if (parent != null && !parent.isRoot()) { | ||
| mkdirs(parent); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -478,14 +505,36 @@ public FSDataOutputStream createNonRecursive(final Path f, final FsPermission pe | |
| TracingContext tracingContext = new TracingContext(clientCorrelationId, | ||
| fileSystemId, FSOperationType.CREATE_NON_RECURSIVE, tracingHeaderFormat, | ||
| listener); | ||
| /* | ||
| * Get exclusive access to folder if this is a directory designated for atomic | ||
| * rename. The primary use case of the HBase write-ahead log file management. | ||
| */ | ||
| AbfsBlobLease abfsBlobLease = null; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As currently v1 doesnt provide this, lets have this functionality in a config control and have it as off.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added for createNonRecursive lease acquire: fs.azure.lease.create.non.recursive. default: false |
||
| String parentPath = parent.toUri().getPath(); | ||
| if (getAbfsStore().getPrefixMode() == PrefixMode.BLOB | ||
| && getAbfsStore().isAtomicRenameKey(parentPath)) { | ||
| if (getAbfsStore().getAbfsConfiguration().isLeaseOnCreateNonRecursive()) { | ||
| abfsBlobLease = new AbfsBlobLease(getAbfsClient(), | ||
| parentPath, BLOB_LEASE_ONE_MINUTE_DURATION, tracingContext); | ||
| } | ||
| } | ||
| final FileStatus parentFileStatus = tryGetFileStatus(parent, tracingContext); | ||
|
|
||
| if (parentFileStatus == null) { | ||
| if (parentFileStatus == null || !parentFileStatus.isDirectory()) { | ||
| if (abfsBlobLease != null) { | ||
| abfsBlobLease.free(); | ||
| } | ||
| throw new FileNotFoundException("Cannot create file " | ||
| + f.getName() + " because parent folder does not exist."); | ||
| + f.getName() | ||
| + " because parent folder does not exist or is a file."); | ||
| } | ||
|
|
||
| return create(f, permission, overwrite, bufferSize, replication, blockSize, progress); | ||
| final FSDataOutputStream outputStream = create(f, permission, overwrite, | ||
| bufferSize, replication, blockSize, progress, true); | ||
| if (abfsBlobLease != null) { | ||
|
saxenapranav marked this conversation as resolved.
|
||
| abfsBlobLease.free(); | ||
| } | ||
| return outputStream; | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need for additional parameter ?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is also called from
createNonRecursivewhich already checks the parent dir and acquire lease on the parent dir if its an atomicDirectory. The new field will tell the method that the parent dir exists and has been checked.Why we dont want this method to check parentDir again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be great if this is added as a brief comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have added javadoc in the new method.