Skip to content

Commit

Permalink
[BugFix] Cancel fragment instance not query when close external scann…
Browse files Browse the repository at this point in the history
…er (#20264)

* [BugFix] cancel fragment instance not query when close external scanner

If cancel query, other fragment instances (scanners) will be cancelled,
and spark connector will report SocketTimeoutException when get next data from BE.

Query executed by external connectors does not need to report exec state
to FE.

Signed-off-by: wyb <[email protected]>
(cherry picked from commit 907dd9d)
  • Loading branch information
wyb authored and wanpengfei-git committed Mar 27, 2023
1 parent 6d57a55 commit d303de9
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 6 deletions.
6 changes: 6 additions & 0 deletions be/src/exec/pipeline/pipeline_driver_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ void GlobalDriverExecutor::report_exec_state(QueryContext* query_ctx, FragmentCo
_update_profile_by_level(query_ctx, fragment_ctx, done);
auto params = ExecStateReporter::create_report_exec_status_params(query_ctx, fragment_ctx, status, done);
auto fe_addr = fragment_ctx->fe_addr();
if (fe_addr.hostname.empty()) {
// query executed by external connectors, like spark and flink connector,
// does not need to report exec state to FE, so return if fe addr is empty.
return;
}

auto exec_env = fragment_ctx->runtime_state()->exec_env();
auto fragment_id = fragment_ctx->fragment_instance_id();

Expand Down
9 changes: 7 additions & 2 deletions be/src/exec/pipeline/sink/memory_scratch_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Status MemoryScratchSinkOperator::set_cancelled(RuntimeState* state) {
_pending_result.reset();
_is_finished = true;
_has_put_sentinel = true;
_queue->update_status(Status::Cancelled("Set cancelled by MemoryScratchSinkOperator"));
return Status::OK();
}

Expand All @@ -60,8 +61,12 @@ Status MemoryScratchSinkOperator::push_chunk(RuntimeState* state, const vectoriz
return Status::OK();
}
std::shared_ptr<arrow::RecordBatch> result;
RETURN_IF_ERROR(convert_chunk_to_arrow_batch(chunk.get(), _output_expr_ctxs, _arrow_schema,
arrow::default_memory_pool(), &result));
auto status = convert_chunk_to_arrow_batch(chunk.get(), _output_expr_ctxs, _arrow_schema,
arrow::default_memory_pool(), &result);
if (!status.ok()) {
_queue->update_status(status);
return status;
}

if (!_queue->try_put(result)) {
DCHECK(_pending_result == nullptr);
Expand Down
14 changes: 10 additions & 4 deletions be/src/runtime/external_scan_context_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,25 @@ Status ExternalScanContextMgr::clear_scan_context(const std::string& context_id)
context = iter->second;
if (context == nullptr) {
_active_contexts.erase(context_id);
Status::OK();
return Status::OK();
}
iter = _active_contexts.erase(iter);
}
}
if (context != nullptr) {
// cancel pipeline
const auto& fragment_instance_id = context->fragment_instance_id;
if (auto query_ctx = _exec_env->query_context_mgr()->get(context->query_id); query_ctx != nullptr) {
query_ctx->cancel(Status::Cancelled("user cancelled"));
if (auto fragment_ctx = query_ctx->fragment_mgr()->get(fragment_instance_id); fragment_ctx != nullptr) {
std::stringstream msg;
msg << "FragmentContext(id=" << print_id(fragment_instance_id) << ") cancelled by close_scanner";
fragment_ctx->cancel(Status::Cancelled(msg.str()));
}
}
// clear the fragment instance's related result queue
_exec_env->result_queue_mgr()->cancel(context->fragment_instance_id);
LOG(INFO) << "close scan context: context id [ " << context_id << " ]";
_exec_env->result_queue_mgr()->cancel(fragment_instance_id);
LOG(INFO) << "close scan context: context id [ " << context_id << " ], fragment instance id [ "
<< print_id(fragment_instance_id) << " ]";
}
return Status::OK();
}
Expand Down

0 comments on commit d303de9

Please sign in to comment.