From f0d1dd37d02752d399884162a72bfae350840e78 Mon Sep 17 00:00:00 2001 From: wyb Date: Mon, 27 Mar 2023 10:40:38 +0800 Subject: [PATCH] [BugFix] Cancel fragment instance not query when close external scanner (#20264) * [BugFix] cancel fragment instance not query when close external scanner If cancel query, other fragment instances (scanners) will be cancelled, and spark connector will report SocketTimeoutException when get next data from BE. Query executed by external connectors does not need to report exec state to FE. Signed-off-by: wyb (cherry picked from commit 907dd9d771d07e9c1e167a6a245497f2eb2c8788) --- be/src/exec/pipeline/pipeline_driver_executor.cpp | 6 ++++++ .../pipeline/sink/memory_scratch_sink_operator.cpp | 9 +++++++-- be/src/runtime/external_scan_context_mgr.cpp | 14 ++++++++++---- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/be/src/exec/pipeline/pipeline_driver_executor.cpp b/be/src/exec/pipeline/pipeline_driver_executor.cpp index 0d1b700915076..134583a367efe 100644 --- a/be/src/exec/pipeline/pipeline_driver_executor.cpp +++ b/be/src/exec/pipeline/pipeline_driver_executor.cpp @@ -207,6 +207,12 @@ void GlobalDriverExecutor::report_exec_state(QueryContext* query_ctx, FragmentCo _update_profile_by_level(query_ctx, fragment_ctx, done); auto params = ExecStateReporter::create_report_exec_status_params(query_ctx, fragment_ctx, status, done); auto fe_addr = fragment_ctx->fe_addr(); + if (fe_addr.hostname.empty()) { + // query executed by external connectors, like spark and flink connector, + // does not need to report exec state to FE, so return if fe addr is empty. + return; + } + auto exec_env = fragment_ctx->runtime_state()->exec_env(); auto fragment_id = fragment_ctx->fragment_instance_id(); diff --git a/be/src/exec/pipeline/sink/memory_scratch_sink_operator.cpp b/be/src/exec/pipeline/sink/memory_scratch_sink_operator.cpp index 00dad21c764ac..5b71e165979bb 100644 --- a/be/src/exec/pipeline/sink/memory_scratch_sink_operator.cpp +++ b/be/src/exec/pipeline/sink/memory_scratch_sink_operator.cpp @@ -48,6 +48,7 @@ Status MemoryScratchSinkOperator::set_cancelled(RuntimeState* state) { _pending_result.reset(); _is_finished = true; _has_put_sentinel = true; + _queue->update_status(Status::Cancelled("Set cancelled by MemoryScratchSinkOperator")); return Status::OK(); } @@ -60,8 +61,12 @@ Status MemoryScratchSinkOperator::push_chunk(RuntimeState* state, const vectoriz return Status::OK(); } std::shared_ptr result; - RETURN_IF_ERROR(convert_chunk_to_arrow_batch(chunk.get(), _output_expr_ctxs, _arrow_schema, - arrow::default_memory_pool(), &result)); + auto status = convert_chunk_to_arrow_batch(chunk.get(), _output_expr_ctxs, _arrow_schema, + arrow::default_memory_pool(), &result); + if (!status.ok()) { + _queue->update_status(status); + return status; + } if (!_queue->try_put(result)) { DCHECK(_pending_result == nullptr); diff --git a/be/src/runtime/external_scan_context_mgr.cpp b/be/src/runtime/external_scan_context_mgr.cpp index 9925979922d80..c1a1750ba8b03 100644 --- a/be/src/runtime/external_scan_context_mgr.cpp +++ b/be/src/runtime/external_scan_context_mgr.cpp @@ -94,19 +94,25 @@ Status ExternalScanContextMgr::clear_scan_context(const std::string& context_id) context = iter->second; if (context == nullptr) { _active_contexts.erase(context_id); - Status::OK(); + return Status::OK(); } iter = _active_contexts.erase(iter); } } if (context != nullptr) { // cancel pipeline + const auto& fragment_instance_id = context->fragment_instance_id; if (auto query_ctx = _exec_env->query_context_mgr()->get(context->query_id); query_ctx != nullptr) { - query_ctx->cancel(Status::Cancelled("user cancelled")); + if (auto fragment_ctx = query_ctx->fragment_mgr()->get(fragment_instance_id); fragment_ctx != nullptr) { + std::stringstream msg; + msg << "FragmentContext(id=" << print_id(fragment_instance_id) << ") cancelled by close_scanner"; + fragment_ctx->cancel(Status::Cancelled(msg.str())); + } } // clear the fragment instance's related result queue - _exec_env->result_queue_mgr()->cancel(context->fragment_instance_id); - LOG(INFO) << "close scan context: context id [ " << context_id << " ]"; + _exec_env->result_queue_mgr()->cancel(fragment_instance_id); + LOG(INFO) << "close scan context: context id [ " << context_id << " ], fragment instance id [ " + << print_id(fragment_instance_id) << " ]"; } return Status::OK(); }