Attach weights to IcebergSplits#12579
Conversation
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java
Outdated
Show resolved
Hide resolved
| true)) | ||
| .add(doubleProperty( | ||
| SPLIT_WEIGHT_MAX, | ||
| "split weight max", |
There was a problem hiding this comment.
I don't understand the meaning of this session property.
is it for benchmarking only and can be removed?
There was a problem hiding this comment.
I think we can likely remove it. Maybe @pettyjamesm can you weigh in on why there's the minimum_assigned_split_weight session property in Hive?
There was a problem hiding this comment.
In the Hive implementation, the minimum split weight is a control mechanism to avoid over-queueing really small files, where the computed weight proportional to the target split size would be very small (eg: 1KB files with a split size of 64MB).
In those scenario's, the scheduler would be allowed to assign a huge number of splits to worker split queues which could make the task update request JSON payload huge and/or skew task completion time unevenly across workers. In practice, splits have a certain fixed overhead regardless of input bytes that needed to be accounted for, so I added a minimum weight setting to address that.
I didn't have a use-case for allowing splits to be "larger" than standard, and didn't want to regress queries with, for example: "unsplittable" GZIP compressed text files larger than the target split size- so in the trino-hive implementation I chose to clamp the maximum split weight to SplitWeight.standard() (ie: proportion=1.0). The code in trino-main and trino-spi will support larger than standard weights, but I'm not aware of a use-case where it makes sense to queue significantly fewer splits than the scheduler is configured to support.
There was a problem hiding this comment.
Makes sense. Would users actually know when to modify the session property or do you think we can get the same effect with a hard-coded minimum?
There was a problem hiding this comment.
I tested experimentally with some scan-heavy queries JSON files in S3 (eg: select count(*) from table) and saw the point of diminishing returns around the current default minimum value of 0.05, or a 20x increase in queue depth. It's possible that someone running in a different environment (maybe HDFS storage nodes with local SSDs?) could see further improvements by setting a lower value than I could measure, so I left it configurable just in case.
There was a problem hiding this comment.
In those scenario's, the scheduler would be allowed to assign a huge number of splits to worker split queues which could make the task update request JSON payload huge
this should be taken care of by the engine, not the connector
splits have a certain fixed overhead regardless of input bytes that needed to be accounted for, so I added a minimum weight setting to address that.
good point,
so we should model a split weight as constant + bytes
Would users actually know when to modify the session property or do you think we can get the same effect with a hard-coded minimum?
from the expertly conversation above, i wouldn't be able to set these properties reasonably
i think we should have some constant factor instead.
There was a problem hiding this comment.
so we should model a split weight as constant + bytes
Turns out that's what Iceberg does for Spark: https://github.com/apache/iceberg/blame/9ab94f87de036c9cd91cf8353906a576b4a516ff/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java#L70
I didn't account for delete files either. Splits of the same size could read a lot more data if they have delete files to read. I'm not sure if that's worth accounting for yet
There was a problem hiding this comment.
this should be taken care of by the engine, not the connector
The connector is supposed to implement the weighting scheme reasonably, using whatever heuristics and control bounds that make sense to that connector. In the case of hive and iceberg, there is this risk without a minimum weight- for other connectors the splits themselves might actually benefit from allowing the scheduler to assign 100x more splits with that are extremely cheap to process and serialize as JSON other splits which that connector considers "standard".
There was a problem hiding this comment.
@pettyjamesm a connector doesn't know engine's overhead of sending and processing a split, so cannot determine whether "100x more" is ok
There was a problem hiding this comment.
The connector should know what the JSON payload overhead of it's own split type is on a per-split basis, as well as the overhead associated with processing a given split if it involves things like, eg: opening a connection to S3 or issuing a query to a MySQL database. It doesn't seem unreasonable to me that connectors be expected to implement split weighting responsibly if they choose to implement it at all.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java
Show resolved
Hide resolved
| @Override | ||
| public SplitWeight getSplitWeight() | ||
| { | ||
| return splitWeight; |
There was a problem hiding this comment.
SplitWeight.fromProportion(length)
There was a problem hiding this comment.
besides being simpler, it's better to avoid a new field, so that it's clear what the weight is, and that it's reasonable & consistent with other split's state
There was a problem hiding this comment.
We could try SplitWeight.fromRawValue(length) but that comes with the implication that 100 is the value of a "standard size split" which isn't right for length. I think if we do that the weights will imply that regular size splits are very large and may be scheduled slower. I haven't tested it though.
There was a problem hiding this comment.
My understanding from skimming the code is that if all splits return fromRawValue(100), or fromProportion(1.0) splits will be scheduled exactly how they are now, which is what we want for full size splits.
There was a problem hiding this comment.
That's correct. The internal representation is an integer value, normalized where a single standard split weight value is 100. This is done to avoid floating point error accumulation. This is considered to be an implementation detail, and we made the choice based on the initial PR feedback to expose the a way for connectors to express split weights relative to 1.0 in case someone felt the need to adjust the expression of a "standard" weight for higher granularity, eg: normalize the "standard" weight to 1,000.
There was a problem hiding this comment.
To address the initial feedback point, you need to know the split's length in bytes relative to the configured target split size, which means that you would need to compute the value and store it as a field unless the target split size was also hard-coded.
There was a problem hiding this comment.
I now understand this isn't a good idea.
To address the initial feedback point, you need to know the split's length in bytes relative to the configured target split size
that's weird concept from SPI perspective, since "configured target split size" is not a SPI concept itself
@pettyjamesm if a connector uses fromProportion(d) is relation between d and 1.0 really important?
how?
(and where is it documented? :) )
There was a problem hiding this comment.
A split with a weight of 1.0 is "standard". If all splits are standard weight, then the scheduler will place exactly as many splits per node per task as the scheduler config specifies. If all splits have a weight of 0.5, then the scheduler is allowed to place 2x as many splits per node and task, and with all splits weighing 2.0- 1/2 as many.
It's not explicitly documented, although the PrestoDB PR which merged after the Trino PR did include references to this behavior in the NodeScheduler Properties Reference. We should probably port those documentation changes.
There was a problem hiding this comment.
We should probably port those documentation changes.
thanks, please do
8be28dd to
13ea8d8
Compare
13ea8d8 to
a59e782
Compare
The Iceberg planTasks method buckets small files together into combined scan tasks, but these combined tasks are not used. Instead just plan individual FileScanTasks with the target size.
a59e782 to
c00b7a2
Compare
There was a problem hiding this comment.
Could task.length() / tableScan.targetSplitSize() yield a value > 1.0? If so, you'll probably want to wrap that with Math.min(1.0, (double) task.length() / tableScan.targetSplitSize())
There was a problem hiding this comment.
Looks like if the file format defines split offsets those are used instead, and I guess they could be bigger than the target split size.
So if a split is larger than standard, we want to cap that at 1.0 rather than accounting for the larger size in the queue?
There was a problem hiding this comment.
That was the choice I went with in the case of trino-hive since I didn't want to regress queries. At some point, you do have to set a cap on the upper bound or you could have splits that never get scheduled because there's never enough room in a worker queue to accept it- but you could choose to go maybe as high as 2.0 if you think that's worthwhile, but the safer bet is definitely 1.0. I don't know enough about how tableScan.targetSize() and split generation in general work in Iceberg to know whether it's a real concern.
There was a problem hiding this comment.
is capping to 1.0 required by SplitWeight?
the class doesn't convey that
There was a problem hiding this comment.
There is no requirement for split weights to be capped at 1.0 and larger weights are allowed if the connector wishes to express that fewer splits should be allowed to be placed per node and task. I chose not to experiment too heavily with "larger than standard" weights in the Hive connector implementation, but there wasn't any good reason to prevent other connectors from choosing to do so if it made sense for their context. It's possible that even with the Hive connector, there are scenarios that could benefit- but the information available to make a decision like that is fairly limited and I didn't pursue experimenting with it.
There was a problem hiding this comment.
larger weights are allowed if the connector wishes to express that fewer splits should be allowed to be placed per node and task.
I think connector shouldn't make such a decision. After all, it doesn't control the nodes and the task scheduler and it doesn't know queue lengths, etc.
in fact, this discussion shows that this is (a) an expertly to tune and (b) tuning this is a problem that blurs engine/connector separation. Things may need to be configured on both sides for optimal behavior.
There was a problem hiding this comment.
I think connector shouldn't make such a decision. After all, it doesn't control the nodes and the task scheduler and it doesn't know queue lengths, etc.
The connector doesn't need to actively participate in real time with the scheduler and know things like queue lengths, that was kind of the point for going with the "weight based" approach.
in fact, this discussion shows that this is (a) an expertly to tune
I'm not so sure that's true- because "tuning" the value would always require experimentation to find the point at which workers are no longer bottlenecked on not having enough splits.
(b) tuning this is a problem that blurs engine/connector separation. Things may need to be configured on both sides for optimal behavior.
I agree that the implementation is extending the connector's ability to influence the split scheduler, but that's by design. The risk of being misconfigured, however- is fairly small. There's a wide region of "harmless suboptimality" where a better weight assignment scheme either a) split weights are too high and performance could be better if workers didn't spend idle cycles waiting for the next batch of splits to be assigned or b) split weights are too low and the split queues are deeper than they need to be to stay busy. In practice, the "catastrophicly misconfigured" opportunity requires a connector to be extremely aggressive in the weight assignments, which would be a bug that connectors should be expected to fix and can be mitigated by disabling weighted scheduling within that connector if need be.
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java
Outdated
Show resolved
Hide resolved
e51a458 to
dbbfc86
Compare
pettyjamesm
left a comment
There was a problem hiding this comment.
The split weight commit LGTM
There was a problem hiding this comment.
Capture in code comment why 0.05.
Add similar in Hive.
There was a problem hiding this comment.
I don't think we should have a session property for this. Per my understanding, this is a safety toggle.
In particular, a user may set value to 0.0 (or close to 0.0), destabilizing the cluster.
There was a problem hiding this comment.
(btw if you convince me this stays, it needs validator as in
)There was a problem hiding this comment.
i think we should remove this.
this pr #12656 removes this from hive
There was a problem hiding this comment.
@alexjo2144 we can go ahead with this PR, and i can update mine PR to remove from both
There was a problem hiding this comment.
use config's default (new IcebergConfig().get...)
The weight is equal to the split size divided by the target split size.
dbbfc86 to
52e0faf
Compare
|
With config properties changing, we need to modify our docs. @alexjo2144 are you able to do this, or would you like me to take the lead? cc @findepi |
|
Thanks for the reminder. I can put a pr up with the doc change |
Description
Performance improvements for queries against Iceberg tables with small files.
Performance improvement
Iceberg connector with all catalogs
Improve query execution time when the table contains many small files
Related issues, pull requests, and links
Documentation
( ) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
(x) No release notes entries required.
( ) Release notes entries required with the following suggested text: