Skip to content

Weighted Split Scheduling#9059

Merged
martint merged 3 commits intotrinodb:masterfrom
pettyjamesm:weighted-split-scheduling
Oct 23, 2021
Merged

Weighted Split Scheduling#9059
martint merged 3 commits intotrinodb:masterfrom
pettyjamesm:weighted-split-scheduling

Conversation

@pettyjamesm
Copy link
Copy Markdown
Member

@pettyjamesm pettyjamesm commented Aug 31, 2021

Adapted changes from prestodb/presto#16668

Docs are pending but basically this change:

  • Adds a SplitWeight field to ConnectorSplit which allows connectors to indicate “this split is smaller than normal by a factor of X” (for hive, the current only implementation, this is based on “size in bytes”, ie: small files)
  • Changes NodeScheduler and related classes to assign splits workers based on their weight instead of just the split count alone

The effect of the above means that when splits are sized appropriately, no behavior changes- but when splits are small (ie: when the hive connector is processing small files) the worker split queues are allowed to be deeper to compensate which significantly improves performance.

Description of Changes

Changes to trino-spi and trino-main

ConnectorSplits now carry a SplitWeight, which by default returns a "standard" split weight but which can be overridden by connector implementations to influence split scheduling behaviors.

All NodeScheduler split assignment decisions are now based on node total and task queued split weight totals instead of split counts, except for "task unacknowledged split counts" (a pre-existing behavior controlled by NodeSchedulerConfig(node-scheduler.max-unacknowledged-splits-per-task)). That configuration is now much more significant since it can be used to control how large individual task update requests sent from the coordinator to workers can get when a large number of splits with small weights are scheduled.

Changes to trino-hive

A version of split weighting for the hive connector is included, which is enabled by default but can be disabled by
setting the Hive session property size_based_split_weights_enabled=false or the configuration property hive.size-based-split-weights-enabled=false. When disabled, all splits are assigned the standard weight.

When enabled, splits are assigned their weight based on their size in bytes relative to hive.max-split-size. Splits that are 1/2 of the max split size will be weighted as 1/2 of the standard split weight. In this implementation, no split will be
assigned a weight smaller than the value set by the minimum_assigned_split_weight hive session property or the
hive.minimum-assigned-split-weight configuration property (default: 5). This provides a mechanism to control how aggressively the scheduler will respond to the presence of small files. With the current standard split
weight of 100, this means that split queues will at most be scheduled 20x deeper when all splits are smaller than 1/20th of the max split size.

Currently, splits that are greater than the hive.max-split-size value (eg: unsplittable files) are also assigned the standard split weight, such that any given assigned weight will always fall between the minimum assigned and standard weight. This is an implementation choice for the Hive connector, but not a strict requirement on the behavior that connectors might choose to implement in the future.

Benchmarks

TPCH scale factor 10GB suite datasets were generated in both Parquet and JSON:

Dataset Rows Per-File Typical Resulting File Size Compression
Parquet Normal 10M 210MB Snappy
Parquet Small 3,000 140KB Snappy
JSON Normal 10M 2GB Uncompressed
JSON Small 3,000 1MB Uncompressed
  • The relatively small 10GB scale factor was chosen because otherwise the small file datasets generated way too many S3 objects and the data generation process would fail because of S3 throttling
  • The JSON files were uncompressed so that the normal files dataset splits would still be considered splittable. Compressed files are not splittable and sending a few large splits to workers for single threaded processing on workers would not have been effective at comparing the effect of scheduler behavior.

TPCH suite execution time geomean measurements for PrestoDB (not tested separately with Trino, but assumed to be comparable based on the similarity of scheduling behaviors in this context), collected on a cluster of r5.8xlarge instances, with 5 worker nodes (and one coordinator):

Baseline

File Format Small File Geomean Normal File Geomean
Parquet 32.86 2.6
JSON 32.18 5.4

Improved

File Format Small File Geomean Small File Imrovement Normal File Geomean Normal File Improvement
Parquet 14.04 ~2.34x 2.61 unchanged
JSON 8.73 ~3.68x 5.33 unchanged

Note, that before weighted scheduling, both parquet and JSON small files performed about the same because were bottlenecked on split scheduling throughput and latency. With weighted scheduling enabled, the bottleneck becomes worker I/O and decoding throughput so parquet and JSON perform differently as a result.

@findepi
Copy link
Copy Markdown
Member

findepi commented Aug 31, 2021

What is the Weighted Split scheduling?

@pettyjamesm pettyjamesm force-pushed the weighted-split-scheduling branch from df7827f to 16ab6fb Compare August 31, 2021 16:17
@pettyjamesm pettyjamesm marked this pull request as ready for review September 1, 2021 14:57
@pettyjamesm pettyjamesm requested a review from dain September 1, 2021 15:04
@sopel39
Copy link
Copy Markdown
Member

sopel39 commented Sep 2, 2021

TPCH scale factor 10GB suite datasets were generated in both Parquet and JSON:

@pettyjamesm did you experiment with partitioned TPC-DS?

@sopel39 sopel39 self-requested a review September 2, 2021 15:53
@pettyjamesm
Copy link
Copy Markdown
Member Author

pettyjamesm commented Sep 2, 2021

@sopel39 I did not, because in this context it seemed redundant. TPCH is a stand-in for "some arbitrary workload" (I could have chosen TPCDS, but I did not). If instead I chose to focus on scan throughput workloads, (eg: SELECT count(*) from ...) I would have had much dramatic factors of improvement.

The queries themselves in this case don't matter (as long as they're not cherry-picked to emphasize scan throughput) so long as the plans are identical between the baseline and experiment runs (they are). What we're measuring is the bottleneck of scheduling only 100 splits per node and 10 splits queued per task when splits are small, compared to making the split assignment thresholds adaptive based on split size. Incidentally, the "normal" file size cases are included to make sure that performance doesn't regress when splits are "normal".

@pettyjamesm pettyjamesm force-pushed the weighted-split-scheduling branch from 16ab6fb to c8096c1 Compare September 20, 2021 21:09
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline this limits the maximum ratio that can be expressed.

It is not uncommon for a standard split size to be set in 256 / 512MB range while the actual splits (due to a small file problem) are much smaller (e.g.: 1 - 10MB). With the standard weight of 100 it is impossible to express difference in size of a 1MB split and a 10MB split.

At this point I'm not sure if it makes a big practical difference (due to constant IO costs of opening a split). However SplitWeight is going to be a part of the high level SPI interface that ideally should be kept backward compatible. It feels like leaving a flexibility to express higher ratios to future proof it wouldn't hurt.

Since using a floating point number is not an option (due to error accumulation) I would recommend using BigDecimal to represent weight. With a standard weight being 1 weight of 0.5 would mean that the split is 2x smaller while the weight of 2 would mean that a split is twice as large (comparing to the standard size). MathContext.DECIMAL32 should provide enough precision to represent any meaningful ratios of split size. Additionally it feels like having it as a decimal number makes it more intuitive.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since using a floating point number is not an option (due to error accumulation)

@arhimondr are you sure?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great question. I'm not sure actually. That was a takeaway from a quick offline discussion we had with James. While in theory floating point numbers tend to loose precision on long running sequences or when operating on numbers that are multiple orders of magnitude different it doesn't feel like the split queue depth should result in high magnitude differences between the current queue weight and a single split weight.

    public static void main(String[] args)
    {
        double doubleSum = 10_000_000;
        BigDecimal bigDecimalSum = BigDecimal.valueOf(10_000_000);
        for (int i = 0; i < 100_000_000; i++) {
            doubleSum += 0.000001;
            bigDecimalSum = bigDecimalSum.add(BigDecimal.valueOf(1, 6));
        }
        System.out.printf("Double: %s, Decimal: %s\n", doubleSum, bigDecimalSum);
        for (int i = 0; i < 100_000_000; i++) {
            doubleSum -= 0.000001;
            bigDecimalSum = bigDecimalSum.subtract(BigDecimal.valueOf(1, 6));
        }
        System.out.printf("Double: %s, Decimal: %s\n", doubleSum, bigDecimalSum);
    }
    
    > Double: 1.0000100024044514E7, Decimal: 10000100.000000
    > Double: 1.0E7, Decimal: 10000000.000000

I wonder if the precision lose is significant for this specific use-case? @pettyjamesm could you please elaborate a little more on problems you observed with double?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't specifically experiment with, and then run into issues with double- but rather discarded it because of the potential for these problems. A SplitWeight implementation using double that didn't enforce anything beyond values being > 0 would run the risk of accumulating rounding error either on the task side or coordinator side total weight accounting (either for node total or task queued split weights) wouldn't balance to 0. If that were to happen, then the fractional error could either let the weight value dip below 0.0 or could stay above 0 even after all splits finished which would prevent that node from being fully utilized in the future because of a phantom weight.

It was easier to constrain this implementation to integer semantics to make the accounting sane, but with the unfortunate consequence of having to pick some arbitrary value as the "standard". I talked to @arhimondr about this offline today and I think there's a balance point that we can reach to make weights "look" more like values relative to 1.0 for the ergonomics of the API but still use integer arithmetic for the internal book keeping logic.

Copy link
Copy Markdown
Contributor

@arhimondr arhimondr Sep 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the weight is tracked per node and per task.

Realistically it doesn't feel like like an error can accumulate significantly in a scope of a task. The maximum weight assigned shouldn't exceed 100 - 1000 with the minimum weight being in the 0.01 - 0.0001 range. The space of variation for weight in this scenario shouldn't exceed ~8 decimal digits. Double uses 54 bits to express values (that is ~16 decimal points) leaving another 8 decimal points of precision. The weight value is also going to be discarded as soon as the task is finished thus it should be okay if it doesn't converge to perfect 0 at the end.

From what I understand the concern here is mostly around node level weight tracking. It could potentially have a much higher variation space leaving fever "digits" for rounding errors, thus It feels like it is safer to encode weight as an integers, @findepi what do you think?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was answering the original problem that picking integers fixes the minimal granularity we operate at. I didn't follow how these weights are used, and whether the error can accumulate over time, and what are the consequences of this being the case. Would the system no longer be 'perfectly fair', or would it start doing nonsense?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe getTotalWeight?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an overly specific function to have as a generic feature in this class.

This can easily be done by a caller with:

splitWeights.stream()
    .mapToLong(SplitWeight::getEncodedValue)
    .reduce(0, Math::addExact);

Also, it's something that only the scheduler cares about, so it shouldn't be exposed as part of the SPI.

Comment thread core/trino-spi/src/main/java/io/trino/spi/SplitWeight.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/execution/TaskStatus.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/execution/PartitionedSplitsInfo.java Outdated
Copy link
Copy Markdown
Contributor

@arhimondr arhimondr Sep 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are trying to avoid using @Nullable. Also it doesn't look like an actual split is needed here. How about addDriverContext(Lifespan lifespan, OptionalLong splitWeight)?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with just long splitWeight instead since any other alternative was a needless short term object allocation and it made more sense to me to just add some extra validation checks.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what is the memory implication of a split on a worker node. Should there be a limit for maximum pending splits on a node to avoid scheduling to many split objects that can in theory push the node out of memory (in some degenerate case?)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ForStage name feels confusing. From what I understand it returns a weight of splits assigned to a node for this specific stage, that effectively means the total weight of splits assigned to a task on that specific node.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just a straight "find: 'count', replace: 'weight'" in method names exercise, so I tried to avoid renaming methods even if they didn't make total sense (yeah, this is more conceptually "ForTask" and not "ForStage").

This isn't total splits on that node because splits that are actively running are not included, and the semantics are a little subtle because splits in any of the following states (for this particular task) are considered "queued" in this context:

  • On the worker node, queued but not yet running as of the last task status update
  • On the coordinator, not yet sent to the worker but already placed into the HttpRemoteTask to be sent in the next update request
  • Tentatively assigned to the task during the current split scheduling batch, but not yet placed into the HttpRemoteTask

@pettyjamesm pettyjamesm force-pushed the weighted-split-scheduling branch from c8096c1 to 56b2ab0 Compare September 23, 2021 20:48
@arhimondr
Copy link
Copy Markdown
Contributor

arhimondr commented Sep 23, 2021

Summarizing some of the key points discussed offline with @pettyjamesm

Scheduler is trying to optimize split assignment to decrease query latency. Assigning too many splits or too little splits to a node may result in decreased latency. When too many splits are assigned to a single node it may negatively impact query parallelism (there will be no more splits to assign to other available nodes). When too little splits are assigned to a single node there's a chance of that node being idle waiting for next split assignment.

Currently scheduler assumes that all splits are of the same size. In other words the scheduler assumes that every split takes exactly same amount of time to be processed. Since in reality splits are not of the same "size" it is worth giving connector an opportunity to provide some "hints" to the scheduler about the relative size of a split. This PR introduces a concept of a split weight that is intended to provide a "hint" to the scheduler about a relative size of a split comparing to a split of a "standard size".

At this point there's a couple of open questions:

  1. Should the weight be aligned between connectors? What guidance a connector developer should follow when assigning weights?
    • Currently weight is assigned based on the configured split size.
  2. How to configure maximum split queue depth?
    • Currently queue depth is expressed in a number of splits of a "standard" weight.
  3. What weight resolution is considered required, practical and future proof?
    • With the standard weight of 100 it is impossible to express that one split is more than 100x smaller than a standard one? Is this sufficient?
  4. How to encode weight to minimize computation and encoding costs?
    • The decision of encoding weight as an integer with a standard weight of 100 is based on the next considerations:
      - double may loose precision
      - BigDecimal could be slow and may potentially slow down the scheduler
      - int based weight is fast, yet it requires a baseline to express fractional weights. The conservatively low default of 100 is chosen to optimize JSON encoding. Setting it to something higher would result in extra zeros being transferred for each round-trip.
  5. Is it required to have a limit on the maximum number of splits assigned to a single node (to avoid unnecessary memory related overhead)?
    • Currently this PR still enforces local queue depth based on the number of splits, but no longer enforces worker queue depth based on the number of splits. It can in theory result in higher memory overhead when scheduling too many splits with relatively low weight.

@pettyjamesm pettyjamesm force-pushed the weighted-split-scheduling branch from 56b2ab0 to d335bc8 Compare September 28, 2021 14:42
@pettyjamesm
Copy link
Copy Markdown
Member Author

pettyjamesm commented Sep 28, 2021

I've pushed a new commit to address this point 3 for review and consideration. Below are my inline responses / observations about the open questions:

  1. Should the weight be aligned between connectors? What guidance a connector developer should follow when assigning weights?
    • Currently weight is assigned based on the configured split size.

Correct, the current Hive implementation current assigns splits based on size relative to the configured target split size. I believe for the time being that the guidance for other connectors should be to assign weights relative to whatever concept of "standard" size that might be applicable. While it would certainly be nice to establish a cross-connector definition of split weight (eg: "expected processing time to complete the split, relative to 1 second") such that the scheduler could be more fair when handling splits between different connectors, I think for now we'll have to leave that as a non-goal of this API but something that could be done later on top of the weighting abstraction.

  1. How to configure maximum split queue depth?
    • Currently queue depth is expressed in a number of splits of a "standard" weight.

In effect, as currently implemented, the UNIT_VALUE of 100 sets an upper bound on the queue depths as being 100x the configured value. If we decide to increase the unit value to enable better resolution of relative weighting then I would expect that we'll need a guard-rail maximum count value limit in the scheduler, but thankfully full count and weight information is still available to the scheduler so this would be a minor change to add.

  1. What weight resolution is considered required, practical and future proof?
    • With the standard weight of 100 it is impossible to express that one split is more than 100x smaller than a standard one? Is this sufficient?

The primary creation API is now via SplitWeight.fromProportion(double) (I'm open to other names for the internal / external value methods) which accepts a floating point value such that 1.0 is a "standard" split, which will internally represent whatever absolute value we need it to in the future without breaking connector implementations.

  1. How to encode weight to minimize computation and encoding costs?
    • The decision of encoding weight as an integer with a standard weight of 100 is based on the next considerations:
      • double may loose precision
      • BigDecimal could be slow and may potentially slow down the scheduler
      • int based weight is fast, yet it requires a baseline to express fractional weights. The conservatively low default of 100 is chosen to optimize JSON encoding. Setting it to something higher would result in extra zeros being transferred for each round-trip.

The summary around the decision for integer expressed values in arithmetic for efficiency and accuracy is correct. The weights are added / subtracted in batches and in running totals on a per task and per worker basis in latency sensitive parts of the code (making BigDecimal less than ideal for performance reasons) and accumulated floating point error could result in a workers overall current weight sums becoming incoherent through addition / subtraction of assigned weights. The current compromise approach for item 3 of this list should help make this more straightforward to reason about and flexible if greater precision is required. If we think that greater precision is important now, then I don't see an issue with increasing the UNIT_VALUE to say 1,000 without too much of a JSON serialization concern. That said, letting split queues grow up to 1000x deeper than configured would seem to be excessive and dangerous without the guard-rails mentioned in my reply to item 2.

  1. Is it required to have a limit on the maximum number of splits assigned to a single node (to avoid unnecessary memory related overhead)?
    • Currently this PR still enforces local queue depth based on the number of splits, but no longer enforces worker queue depth based on the number of splits. It can in theory result in higher memory overhead when scheduling too many splits with relatively low weight.

I haven't seen it come up in the environments this currently runs in today (EMR, Athena) with a maximum 100x split queue size increase but it's possible that in other environments or with deeper queues that it could become an issue. Seems like another fair point to consider in whether or not item 2 needs to be addressed before merging (again, especially if we let queues theoretically grow deeper than 100x the configured limit).

@pettyjamesm pettyjamesm force-pushed the weighted-split-scheduling branch from d335bc8 to 80efe67 Compare September 29, 2021 20:22
Comment thread core/trino-spi/src/main/java/io/trino/spi/SplitWeight.java Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an overly specific function to have as a generic feature in this class.

This can easily be done by a caller with:

splitWeights.stream()
    .mapToLong(SplitWeight::getEncodedValue)
    .reduce(0, Math::addExact);

Also, it's something that only the scheduler cares about, so it shouldn't be exposed as part of the SPI.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one also seems overly specific to be a generic method in this class. Why would "anyone" (aside from specialized behaviors in the scheduler) ever need to calculate "the sum of the weights of standard split weights for a certain number splits". This logic would be better placed in the scheduler, not in the SPI.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but as discussed below- this method seems to need to live in this class in order to encapsulate that mapping of a "standard" weight internally to allow for that standard mapping to change in the future without breaking changes.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't necessarily need to be just "the internal representation". We could say weights are just an integer value larger than 0. In that case, getValue() is the right name for this method.

I haven't looked at the rest of the code yet, but I assume that all that matters for scheduling purposes is the relative weights between splits and in comparison to the total weight assigned to each tasks.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that all that matters for scheduling purposes is the relative weights between splits and in comparison to the total weight assigned to each tasks.

In some sense, yes- the relative weight is what matters to control the effective behavior. However, this is accomplished by making the weight relative to a "standard weight" and not just the relative weight between to individual splits. Most logic that uses the long value is just adding or subtracting these values from some total sum, except for the initial scheduler parameters configured via NodeSchedulerConfig(node-scheduler.max-pending-splits-per-task) and NodeSchedulerConfig(node-scheduler.max-splits-per-node). Critically, that point needs to be able to say that a configured split count of N is equivalent to a combined weight of X "standard" splits.

It doesn't necessarily need to be just "the internal representation".

Yeah, I'm not in love with the name, and am happy to change this back to getValue(), but what I'm trying to achieve is a de-coupling between SPI consumers, the scheduler logic, and some chosen integer value for a "standard split" so that we can in the future choose to redefine UNIT_VALUE (currently: 100) to some alternative value (eg: 1,000) without breaking SPI consumers. This was raised during review by @arhimondr as a consideration (that I think is valid), that we might in the future need to express relative weights between splits at a higher granularity than a value of 100 allows. To achieve that, there are few important interaction points to disambiguate from one another to keep the the "relative to standard" abstraction encapsulated:

  • SplitWeight#encodedValueForStandardSplitCount for the scheduler to translate its configured split counts into the integer equivalent (again, not in love with the name and happy to change it)
  • SplitWeight#fromProportion(double) for connectors to translate their weighting scheme into a "relative to standard" without hard coding information about the integer value of a standard split, ie: 0.5 should be 1/2 of the standard and 2.0 should be 2x the standard weight, regardless of what the standard weight is chosen to be at any point in time (happy to change the name here too)
  • SplitWeight#getEncodedValue() where the logic is simply doing weight accounting, adding and subtracting this value on a per-split basis, and doesn't need to understand any association of an integer relative to standard to weight (ie: most of the code in trino-main, with the notable exception of the scheduler)
  • SplitWeight#toString() and maybe another method in the future for a connector to recover some "relative to standard" view of a weight value for the purposes of logging / debugging without coupling to an assumption of UNIT_VALUE = 100, which might be too tempting if this method were just named getValue().

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about calling it "raw value" or "raw weight"?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that works for me. Will add that as a refactoring commit and once you're happy with the state of the PR overall I'll squash this down to two commits:

  • The core changes to SPI + Main
  • The Hive implementation

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not treat weight as just the "size in bytes" (or "size in kilobytes" or another unit if we want to avoid potential overflows when adding weights -- although that seems unlikely), instead trying to normalize to a targetSize dynamically. It would simplify the code a bit.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The basic problem here is that we need to normalize weights against:

  1. The hive configured target split size (this is based on size in bytes)
  2. A "standard" split as is understood by the scheduler logic (ie: a standard unit across all connectors, some of which may have no concept of size in bytes)

This logic here is asserting that hive.max-split-size == 1 standard split and then computing a relative weight for each split based on it's actual size relative to that.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wouldn't be necessary if we treat weights as "just an integer larger than 0". See other comments.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that the scheduler needs to know how to translate split counts into split weights, such that any connector can participate in the weighting of splits without having to establish a new definition in common. The definition here is that the scheduler has a standard conversion of 1 standard split to some integer value (and we're trying to keep the exact integer mapping as an internal implementation detail so that we can change the unit value in the future without breaking the SPI consumers). If we say that the consumers declare a floating point value relative to 1.0 as standard- then we can come back later and say long UNIT_VALUE = 1_000 to change the expressible relative granularity and nothing breaks.

@pettyjamesm pettyjamesm force-pushed the weighted-split-scheduling branch 2 times, most recently from 0ed10a9 to 65784f6 Compare October 5, 2021 12:46
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems weird that this method would have side effects (clearLocalSplitInfo()) if the preconditions are not met (partitionedSplits != null & partitionedSplits.XXX >= 0)

What's the purpose of calling clearLocalSplitInfo under those conditions instead of just bailing out?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TaskPartitionedSplitCountTracker is sort of a specific sub-view of the global NodeTaskMap total split state per worker, so when we're going to bail out and throw an exception we want to undo the effect of any task-specific values on the global state first. The existing logic was doing the the same thing when / if a negative value was encountered for localPartitionedSplitCount, now it's been extended to consider negative weights or null arguments too.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the race condition because we track count and weight separately? We should consider wrapping them in a holder object that gets updated atomically -- or just make the update to those fields atomic via synchronized. Reasoning about correctness under race conditions is hard.

Copy link
Copy Markdown
Member Author

@pettyjamesm pettyjamesm Oct 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The potential for a partially inconsistent view on this mostly comes from the worker side PipelineContext which increments / decrements AtomicLongs separately and also traverses non-snapshotable state as part of PipelineContext#getPipelineStatus. In that scenario, the race is basically unavoidable because you can't snapshot the whole pipeline state to get a consistent view without a huge locked region and significant performance risk.

In this case, we know that minor data races are possible but essentially benign. If what you observe in that snapshot is count=0, weight > 0, you know that with a tiny change to the timing you could also have observed either count=0, weight=0 or count=n, weight > 0). In that situation, choosing to discard the weight seemed preferable to synthesizing a fake split count to simulate the latter scenario.

Incidentally, the global NodeTaskMap can have similar inconsistent snapshot states which you could address by synchronizing updates, but that would also create a potentially concerning lock contention bottleneck since every task update and split assignment operation would need to serialize on that lock.

Comment thread core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/execution/scheduler/NodeScheduler.java Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why / 2.0 ? How was that chosen?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous threshold was calculated at 1/2 of the maximum split count, so I preserved that when translating the logic over to use weights instead of counts.

@pettyjamesm pettyjamesm force-pushed the weighted-split-scheduling branch 2 times, most recently from 17901a4 to c63a5bd Compare October 7, 2021 21:54
@pettyjamesm pettyjamesm force-pushed the weighted-split-scheduling branch from c63a5bd to 7e72ee7 Compare October 20, 2021 15:03
This change adds the notion of a SplitWeight to the trino-spi, a
concept which allows connectors to indicate the amount of work
associated with a given split, relative to some "standard" split.

Connectors can choose to assign that concept of a weight based on
whatever heuristics might be appropriate within the context of that
connector, such as size in bytes or some other metric that might
be known about the split at the time that it is generated. The
calculation need not be fully precise, but care should be taken
when implementing weight calculations in connectors to avoid using
weights that are very small or very large.

In the trino-main side, split weights are used to inform how many
splits are allowed to be running on a given node or queued on a
given task (still configured by: node-scheduler.max-splits-per-node
and node-scheduler.max-pending-splits-per-task respectively). Those
values are now interpreted to be relative to one "standard" split
weight. When all splits are assigned the standard weight, the
scheduler will behave the same way as before. However, when splits
are assigned weights that are smaller the scheduler will allow
more of them to be assigned or queued to tasks. In effect, this
allows the coordinator to assign enough splits to workers for them
to stay busy between batches of split assignments arriving when
the amount of time that workers take to complete individual splits
is short.

In order to control the maximum number of splits that might be
delivered in a single task update request, the existing config
parameter node-scheduler.max-unacknowledged-splits-per-task still
controls the absolute count of splits that the coordinator will
allow to be assigned to a given task that have not yet been
acknowledged by the worker as having been received. This can be
especially important if splits themselves have a large serialized
JSON representation, in which case sending a large number of small
splits (by weight) could create huge task update requests to be
sent.
Implements split weight assignments in trino-hive for HiveSplits
based on their size in bytes. This behavior is enabled by
default and can be disabled by setting the session property
`size_based_split_weights_enabled=false` or the hive configuration
property `hive.size-based-split-weights-enabled=false`.

When enabled, splits are assigned weights based on their size
in bytes relative to `hive.max-split-size`. Splits that are 1/2 of
the max split size will be weighted as 1/2 of the standard split
weight.

Splits that are larger than in size than the hive target split size
(eg: unsplittable files) are still assigned the standard split weight
to avoid scheduler interactions that might be harmful when extremely
large weights are calculated. This is a conservative decision designed
to prevent existing workloads from regressing, but might be work
revisiting in the future.

Finally, when size based split weights are enabled, no split will be
assigned a weight smaller than the value set by the
`minimum_assigned_split_weight` session property or the
`hive.minimum-assigned-split-weight` configuration property which
defaults to 0.05 (proportional to 1.0, the weight of a standard split).
This provides a mechanism to control how aggressively the scheduler
will respond to the presence of small files. With the default
configuration, files smaller than 5% of the target split size will
still be assigned 5% of the standard split weight, allowing a
maximum of 20x more splits to be running or queued.
@pettyjamesm pettyjamesm force-pushed the weighted-split-scheduling branch from 7e72ee7 to 94f1fb1 Compare October 22, 2021 15:00
@pettyjamesm
Copy link
Copy Markdown
Member Author

The original commits and follow up PR feedback changes have been rebased and squashed down to two commits, one for trino-main and trino-spi changes and another for the trino-hive connector side implementation.

@martint martint merged commit 770d6cd into trinodb:master Oct 23, 2021
@github-actions github-actions bot added this to the 364 milestone Oct 23, 2021
@pettyjamesm pettyjamesm deleted the weighted-split-scheduling branch November 1, 2021 21:15
Comment on lines +95 to +111
public static long rawValueForStandardSplitCount(int splitCount)
{
if (splitCount < 0) {
throw new IllegalArgumentException("splitCount must be >= 0, found: " + splitCount);
}
return multiplyExact(splitCount, UNIT_VALUE);
}

public static <T> long rawValueSum(Collection<T> collection, Function<T, SplitWeight> getter)
{
long sum = 0;
for (T item : collection) {
long value = getter.apply(item).getRawValue();
sum = addExact(sum, value);
}
return sum;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it supposed to be part of SPI? is the connector supposed to invoke these methods? when?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are part of the SPI only incidentally because the engine needs to use these utilities in various components. They could be moved to trino-main, but then they'd be duplicated in different parts of the code that may or may not perform the same overflow validation.

Connectors should not need to know or care about the "raw value" representation, and should instead prefer to express weights relative to a "standard weight" which is to say, they are supposed to use SplitWeight.fromProportion(double proportion) which takes any value relative to 1.0. Weights must be positive and non-zero, but otherwise connectors are free to express any value that makes sense for that connector to influence the number of splits that are allowed to be assigned to workers and tasks by the scheduler. Internally, we use integer numeric representation, but that is an implementation detail for the engine which is required in order to avoid accumulating account errors due to precision loss which would occur had we used the double representation within the engine itself and in the serialized JSON encoding.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are part of the SPI only incidentally because the engine needs to use these utilities in various components. They could be moved to trino-main, but then they'd be duplicated in different parts of the code that may or may not perform the same overflow validation.

Let's move them to a shared utility class to avoid duplication

Connectors should not need to know or care about the "raw value" representation, and should instead prefer to express weights relative to a "standard weight" which is to say, they are supposed to use SplitWeight.fromProportion(double proportion) which takes any value relative to 1.0.

Can you please add some javadoc? In particular, it's unclear "proportion to what" this is, and what's the range of allowed values.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc and scheduler configuration documentation added in #12684

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

5 participants