Do not throw from PartitionedOutputBufferManager::getData#3502
Do not throw from PartitionedOutputBufferManager::getData#3502karteekmurthys wants to merge 4 commits intofacebookincubator:mainfrom
Conversation
✅ Deploy Preview for meta-velox canceled.
|
c93639f to
baadf75
Compare
There was a problem hiding this comment.
There are multiple places that call getBuffer. It is not clear why this particular place should handle the exception and others should not. Would you clarify?
Similar to the other PR, we should avoid programming-by-exception pattern and instead introduce a method getBufferIfExists that return nullptr if buffer doesn't exist. This method can then be used whenever the caller wants to handle missing buffer (i.e. deleteResults and maybe here).
The method documentation needs to be updated to describe the new behavior and a test needs to be added.
cb8096f to
8634093
Compare
8634093 to
9fd12d4
Compare
mbasmanova
left a comment
There was a problem hiding this comment.
@karteekmurthys Karteek, thank you for iterating on this PR. The PR description explains the problem well. Thank you for taking the time to write it up.
This is misleading and must be handled correctly.
Agree, but would you elaborate on what does it mean to "handled correctly"? In other words, would you update the description to explain the solution implemented here?
Some comments below.
I'm seeing PartitionedOutputBufferManagerTest crashing. Please, take a look.
| // Retrieves the set of buffers for a query. | ||
| std::shared_ptr<PartitionedOutputBuffer> getBuffer(const std::string& taskId); | ||
|
|
||
| std::shared_ptr<PartitionedOutputBuffer> getBufferIfExists( |
There was a problem hiding this comment.
This is a nice method. Please, add empty line before this method and document it. Let's use this method in PartitionedOutputBufferManager::deleteResults as well.
| @@ -265,7 +284,8 @@ class PartitionedOutputBufferManager { | |||
| private: | |||
| // Retrieves the set of buffers for a query. | |||
There was a problem hiding this comment.
Let's clarify that this method throws is buffer for the specified task does not exist.
There was a problem hiding this comment.
I still think we should clarify that this method throws when task is not found.
// Retrieves the set of buffers for the specified task. Throws buffers-no-found exception if not found.
| // BufferManager::acknowledge. | ||
| using DataAvailableCallback = std::function< | ||
| void(std::vector<std::unique_ptr<folly::IOBuf>> pages, int64_t sequence)>; | ||
| enum GetDataStatus { SUCCESS, ERR_BUFFER_NOT_FOUND }; |
There was a problem hiding this comment.
Use enum class and kXxx naming for the values. See https://github.com/facebookincubator/velox/blob/main/CODING_STYLE.md
| } | ||
| std::vector<std::unique_ptr<folly::IOBuf>> pages; | ||
| auto param = std::make_unique<DataAvailableCallbackParam>( | ||
| std::move(pages), sequence, ERR_BUFFER_NOT_FOUND); |
There was a problem hiding this comment.
It would be simpler to just return 'false' to indicate that buffers for the task do not exist and therefore the 'notify' callback cannot be queued. See my other comment.
| } | ||
| } | ||
|
|
||
| TEST_F(PartitionedOutputBufferManagerTest, getDataOnTaskWithNoBuffer) { |
There was a problem hiding this comment.
nit: perhaps, rename to getDataOnFailedTask and add comments to explain the scenario this test is verifying.
7444ed7 to
e19b3d6
Compare
e19b3d6 to
28f295c
Compare
|
@mbasmanova has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
mbasmanova
left a comment
There was a problem hiding this comment.
@karteekmurthys Looks good to me % a few small comments and PR description needing an update to explain the solution implemented here.
| } | ||
| return it->second; | ||
| }); | ||
| auto buffer = getBufferIfExists(taskId); |
There was a problem hiding this comment.
nit: a cleaner way to write this is
if (auto buffer = getBufferIfExists(taskId)) {
buffer->deleteResults(destination);
}
| int64_t sequence, | ||
| DataAvailableCallback notify) { | ||
| getBuffer(taskId)->getData(destination, maxBytes, sequence, notify); | ||
| auto buffer = getBufferIfExists(taskId); |
| // Retrieves the set of buffers for a query. | ||
| std::shared_ptr<PartitionedOutputBuffer> getBuffer(const std::string& taskId); | ||
|
|
||
| // Retrieves the set of buffers for a query if exists. If taskId is not found, |
There was a problem hiding this comment.
query -> task
"If taskId is not found," -> "Returns null if task not found."
| @@ -265,7 +284,8 @@ class PartitionedOutputBufferManager { | |||
| private: | |||
| // Retrieves the set of buffers for a query. | |||
There was a problem hiding this comment.
I still think we should clarify that this method throws when task is not found.
// Retrieves the set of buffers for the specified task. Throws buffers-no-found exception if not found.
| } | ||
| } | ||
|
|
||
| TEST_F(PartitionedOutputBufferManagerTest, getDataOnFailedTask) { |
There was a problem hiding this comment.
Let's update existing calls to getData to assert that they return true.
| 1, | ||
| [¬ified]( | ||
| std::vector<std::unique_ptr<folly::IOBuf>> pages, int64_t sequence) { | ||
| notified = true; |
There was a problem hiding this comment.
Let's simply this using VELOX_UNREACHABLE();
| // manager or was removed by a parallel thread must return false. The `notify` | ||
| // callback must not be registered. | ||
| bool notified = false; | ||
| auto ret = bufferManager_->getData( |
mbasmanova
left a comment
There was a problem hiding this comment.
@karteekmurthys Thank you, Karteek.
|
@mbasmanova has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
|
@mbasmanova has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
|
@mbasmanova merged this pull request in a91b227. |
A task being executed by multiple drivers, and if one of the drivers catches an
exception, can terminate the task. This termination will remove any
PartitionedOutputBuffer assigned to that taskId. In a parallel thread, the task
manager may try to access this buffer leading to "Output buffers not found"
exception. This exception is valid, but instead of the exception encountered by
the Driver trickling up to the user, the "Output buffers not found" exception
may sometimes get thrown to the user.
Instead of throwing exception, the getData() will return
a bool indicating whether a matching buffer for a given task id was found or not.
See #3009 for more context.