Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,43 +29,39 @@ static const std::string kTasksTable = "tasks";
} // namespace

const velox::RowTypePtr SystemTableHandle::taskSchema() const {
static std::vector<std::string> kTaskColumnNames = {
"node_id",
"task_id",
"stage_execution_id",
"stage_id",
"query_id",
"state",
"splits",
"queued_splits",
"running_splits",
"completed_splits",
"split_scheduled_time_ms",
"split_cpu_time_ms",
"split_blocked_time_ms",
"raw_input_bytes",
"raw_input_rows",
"processed_input_bytes",
"processed_input_rows",
"output_bytes",
"output_rows",
"physical_written_bytes",
"created",
"start",
"last_heartbeat",
"end"};

static std::vector<velox::TypePtr> kTaskColumnTypes = {
velox::VARCHAR(), velox::VARCHAR(), velox::VARCHAR(),
velox::VARCHAR(), velox::VARCHAR(), velox::VARCHAR(),
velox::BIGINT(), velox::BIGINT(), velox::BIGINT(),
velox::BIGINT(), velox::BIGINT(), velox::BIGINT(),
velox::BIGINT(), velox::BIGINT(), velox::BIGINT(),
velox::BIGINT(), velox::BIGINT(), velox::BIGINT(),
velox::BIGINT(), velox::BIGINT(), velox::TIMESTAMP(),
velox::TIMESTAMP(), velox::TIMESTAMP(), velox::TIMESTAMP()};
static const RowTypePtr kTaskSchema =
ROW(std::move(kTaskColumnNames), std::move(kTaskColumnTypes));
ROW({"node_id",
"task_id",
"stage_execution_id",
"stage_id",
"query_id",
"state",
"splits",
"queued_splits",
"running_splits",
"completed_splits",
"split_scheduled_time_ms",
"split_cpu_time_ms",
"split_blocked_time_ms",
"raw_input_bytes",
"raw_input_rows",
"processed_input_bytes",
"processed_input_rows",
"output_bytes",
"output_rows",
"physical_written_bytes",
"created",
"start",
"last_heartbeat",
"end"},
{velox::VARCHAR(), velox::VARCHAR(), velox::VARCHAR(),
velox::VARCHAR(), velox::VARCHAR(), velox::VARCHAR(),
velox::BIGINT(), velox::BIGINT(), velox::BIGINT(),
velox::BIGINT(), velox::BIGINT(), velox::BIGINT(),
velox::BIGINT(), velox::BIGINT(), velox::BIGINT(),
velox::BIGINT(), velox::BIGINT(), velox::BIGINT(),
velox::BIGINT(), velox::BIGINT(), velox::TIMESTAMP(),
velox::TIMESTAMP(), velox::TIMESTAMP(), velox::TIMESTAMP()});
return kTaskSchema;
}

Expand Down Expand Up @@ -140,24 +136,24 @@ void SystemDataSource::addSplit(
VELOX_CHECK(currentSplit_, "Wrong type of split for SystemDataSource.");
}

#define SET_TASK_COLUMN(value) \
int j = 0; \
for (const auto& taskEntry : taskMap) { \
auto task = taskEntry.second; \
auto taskInfo = taskInfos[j]; \
flat->set(j, value); \
j++; \
#define SET_TASK_COLUMN(value) \
int j = 0; \
for (const auto& taskEntry : taskMap) { \
[[maybe_unused]] const auto& task = taskEntry.second; \
[[maybe_unused]] const auto& taskInfo = taskInfos[j]; \
flat->set(j, value); \
j++; \
}

#define SET_TASK_FMT_COLUMN(value) \
int j = 0; \
std::string temp; \
for (const auto& taskEntry : taskMap) { \
auto task = taskEntry.second; \
auto taskInfo = taskInfos[j]; \
temp = fmt::format("{}", value); \
flat->set(j, StringView(temp)); \
j++; \
#define SET_TASK_FMT_COLUMN(value) \
int j = 0; \
std::string temp; \
for (const auto& taskEntry : taskMap) { \
[[maybe_unused]] const auto& task = taskEntry.second; \
[[maybe_unused]] const auto& taskInfo = taskInfos[j]; \
temp = fmt::format("{}", value); \
flat->set(j, StringView(temp)); \
j++; \
}

RowVectorPtr SystemDataSource::getTaskResults() {
Expand Down Expand Up @@ -326,11 +322,17 @@ RowVectorPtr SystemDataSource::getTaskResults() {
SET_TASK_COLUMN(velox::Timestamp::fromMillis(task->lastEndTimeMs));
break;
}

default:
VELOX_UNREACHABLE();
}
}
return result;
}

#undef SET_TASK_COLUMN
#undef SET_TASK_FMT_COLUMN

std::optional<RowVectorPtr> SystemDataSource::next(
uint64_t size,
velox::ContinueFuture& /*future*/) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ struct SystemSplit : public velox::connector::ConnectorSplit {
schemaName_(schemaName),
tableName_(tableName) {}

const std::string& schemaName() {
const std::string& schemaName() const {
return schemaName_;
}

const std::string& tableName() {
const std::string& tableName() const {
return tableName_;
}

Expand Down
Loading