[native] SystemConnector to query system.runtime.tasks table#21416
[native] SystemConnector to query system.runtime.tasks table#21416aditi-pandit merged 1 commit intomasterfrom
Conversation
e4fd7e5 to
ec129ca
Compare
majetideepak
left a comment
There was a problem hiding this comment.
@aditi-pandit nice change! some comments.
presto-native-execution/presto_cpp/presto_protocol/Connectors.cpp
Outdated
Show resolved
Hide resolved
...ecution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeGeneralQueries.java
Outdated
Show resolved
Hide resolved
ec129ca to
a01d4d5
Compare
mbasmanova
left a comment
There was a problem hiding this comment.
@aditi-pandit Is there a design document about this connector? I assume SystemConnector requires access to Metastore and we don't have that on the worker. Hence, wondering how this will work?
|
CC: @spershin |
@mbasmanova : This SystemConnector code was to query the tasks table. That seemed the only part of the SystemConnector that was needed at the worker for Prestissimo. Other tables like nodes, queries were populated from in-memory structures in the co-ordinator itself. Any code accessing Metastore (like TablePropertiesSystemTable say) seemed to be required only at the co-ordinator part of the connector. I just spent a day on this prototype to wire the pieces. I haven't put together a design doc. |
mbasmanova
left a comment
There was a problem hiding this comment.
@aditi-pandit Aditi, thank you for clarifying. It is interesting that tasks table is populated on the workers. I wonder why. All the information is available on the coordinator. CC: @tdcmeehan
I think the reason for this is because historically you could always deploy Presto in a mode where many or all of the workers also functioned as coordinators. In this mode, any single coordinator would only know of the tasks whose queries are local to that coordinator. |
@tdcmeehan Tim, thank you for clarifying. I didn't know about this deployment scheme. I'm not sure I understand how this works though. When there are multiple coordinators, wouldn't query results depend on which coordinator is being asked to process the query? Are you saying that in this setup a query can be routed to any coordinator and the results are expected to be the same? I guess in this case it is necessary to ask all the workers to report their tasks since as you pointed out a single coordinator knows about a subset of tasks only. |
|
Generally speaking, Java workers are not compatible with native workers. They use different hash functions and different intermediate results for aggregations. Hence, we had to make a change to run system connector only on coordinator and introduce an exchange before partial agg. These changes may get in the way of making this PR work. |
In this scheme, queries are sticky to a single coordinator (after you POST a query, each |
|
@tdcmeehan Tim, thank you for clarifying. One more follow-up question. In a multi-coordinator deployment, do all workers report themselves to all coordinators or a given worker is fixed-assigned to just one coordinator? In other words, do we have N coordinators managing a shared pool of workers or we have just N "mini" clusters that are independent of each other? |
Workers report themselves to a single discovery service, which is either replicated to other coordinators in an eventually consistent manner, or the discovery service is a single process which is separate from the coordinators. Originally, when this system connector was written, there was no concept of shared resources (e.g. resource groups, global memory management, etc.) and it relied purely on individual backpressure from workers, although there are now tools to help make that work. |
|
@tdcmeehan Tim, I wonder if it still makes sense to support this deployment model. What do you think? Does it makes sense to consider it when thinking about native workers? |
|
Tactically and short term, I think it would be great to support this if there was an easy and not hacky way to get it to work with #21725 and #21285. But given that most people would be deploying Presto for their large to medium size data lakes, I don't think an Impala-style deployment model makes sense for Presto's future, and personally I feel comfortable saying we can deprecate it in the future. That being said, system tables in the coordinator present a challenge for what I feel is one of the end goals of moving to native, which is simplifying our Java code. I'd like to think about a way to move this to C++ so it doesn't need to be present in the Java SPI (thinking way ahead in the future, if the only reason we retain page source providers is for system tables, I think it would be worthwhile to think about how to move system tables to C++). So I'd like to revisit the presumption at some point that system tables must be coordinator-provided tables, since even now that's not necessarily true. |
|
@mbasmanova, @tdcmeehan : Thanks for the discussion. It has been informative. If we want to stay with this approach of getting tasks table on worker we could modify #21725 and #21285 to not perform those rewrites for system.runtime.tasks table specifically as it based on the worker. #21725 could work un-modified as well. It would just mean that we don't allow partial agg over the tasks table which might not be a big deal unless a massive numbers of queries are scheduled in the cluster. wdyt ? |
|
The other fixable issue we are hitting internally in a large setup when querying system tables is that the Native worker does not handle chunked HTTP responses yet. @tdcmeehan do you know what causes a chunked HTTP response from the coordinator? I tried reproducing with a large system table (many entries) but I could not. |
|
@majetideepak Chunked response used to be produced by task/async endpoint which was removed in #21772 . You should not be seeing issues if you update past that PR. |
|
@mbasmanova thank you for the pointer! |
4aae242 to
a79a770
Compare
4e3f161 to
5f792c2
Compare
8d7e1a7 to
25cda8c
Compare
38e1e4e to
7819d14
Compare
|
@mbasmanova, @majetideepak : Thanks for your previous input. This code is looking good for a full review now. Looking forward to your comments. |
tasks table gets data from all-nodes, so both the co-ordinator and workers. Since the co-ordinator generates data, both previous planner rules are also applicable. |
7819d14 to
19426f5
Compare
arhimondr
left a comment
There was a problem hiding this comment.
Mostly style related comments / questions. Otherwise looks good.
| class SystemTableHandle : public velox::connector::ConnectorTableHandle { | ||
| public: | ||
| explicit SystemTableHandle( | ||
| std::string connectorId, |
There was a problem hiding this comment.
Do we usually prefer to pass by value and then move? Or pass by const reference and copy? When do we prefer one over the other?
There was a problem hiding this comment.
@arhimondr : Good question. I prefer pass by const ref and copy to avoid use after move at the caller. But I've seen pass by value and move as a common pattern in Velox especially in PlanNode construction.
c64d4a5 to
b98a57c
Compare
majetideepak
left a comment
There was a problem hiding this comment.
@aditi-pandit some comments. Thanks!
| obj["lastHeartbeatMs"] = lastHeartbeatMs; | ||
| obj["lastTaskStatsUpdateMs"] = lastTaskStatsUpdateMs; | ||
| obj["lastMemoryReservation"] = lastMemoryReservation; | ||
| obj["createTime"] = createTime; |
There was a problem hiding this comment.
Why are we updating these values in this PR?
There was a problem hiding this comment.
@majetideepak : So in the other createTime fields the values were changed to a timestamp, so there was conversion back and forth. Hence, these new fields were added.
| - { name: HiveTableLayoutHandle, key: hive } | ||
| - { name: IcebergTableLayoutHandle, key: hive-iceberg } | ||
| - { name: TpchTableLayoutHandle, key: tpch } | ||
| - { name: SystemTableLayoutHandle, key: $system } |
There was a problem hiding this comment.
Why don't we need system and $system@system here?
There was a problem hiding this comment.
So we need the protocol json classes to be generated only once in this script. There isn't a need for all 3 catalog name mappings here. The mapping of the protocol to the key/catalog name happens in the PrestoToVeloxConnector code now. So that's where we have the 3 catalog name mappings.
czentgr
left a comment
There was a problem hiding this comment.
This is nice and a great tutorial on how to implement basic connectors!
6672878 to
faf9257
Compare
|
@majetideepak : Have addressed your review comments. Would appreciate another pass. Thanks ! |
majetideepak
left a comment
There was a problem hiding this comment.
@aditi-pandit few comments. Thanks!
faf9257 to
38bcdce
Compare
majetideepak
left a comment
There was a problem hiding this comment.
Thanks, @aditi-pandit
Description
SystemConnector is a Presto Connector for system tables. System tables include runtime schema tables like system.runtime.{nodes|tasks|queries|transactions}, properties tables (table properties, schema properties, column properties, analyze properties), hive & iceberg metadata tables.
SystemConnector tables are unique in a way that all of them are populated from metadata structures on the co-ordinator (and optionally from workers). This metadata can be internal process metadata for the runtime tables or metadata obtained from HMS/Iceberg catalog.
The distribution of the SystemTable can be ALL_NODES, ALL_COORDINATORS, SINGLE_COORDINATOR (from https://github.com/prestodb/presto/blob/master/presto-spi/src/main/java/com/facebook/presto/spi/SystemTable.java#L24)
Only one table 'system.runtime.tasks' is populated on ALL_NODES. So this table gets results from the co-ordinator as well as the workers.
In the past, querying this table was broken on Prestissimo since there was no system catalog/connector on workers. This PR enhances the native workers with a System connector/catalog that is used to populate the tasks table. The SystemConnector uses Presto TaskManager task map to populate this table.
There is one more design point. The Java co-ordinator is not fully compatible with native workers. They use different hash functions and different intermediate results for aggregations. So some changes were needed for running the system connector on the co-ordinator.
The changes are :
Both of these planning rules are applicable for tasks table as well since it generates data at both co-ordinator and workers.
Motivation and Context
system.runtime.tasks table is very frequently used in deployment scripts. Querying this table was broken in Prestissimo.
#21413
Test Plan
Added e2e tests