diff --git a/core/trino-main/src/main/java/io/trino/server/DynamicFilterService.java b/core/trino-main/src/main/java/io/trino/server/DynamicFilterService.java index c8d91598de9e..fdd2b0dc3531 100644 --- a/core/trino-main/src/main/java/io/trino/server/DynamicFilterService.java +++ b/core/trino-main/src/main/java/io/trino/server/DynamicFilterService.java @@ -764,17 +764,13 @@ private void collectPartitioned(TaskId taskId, Domain domain) Domain summary = summaryDomains.poll(); // summary can be null as another concurrent summary compaction may be running if (summary != null) { - long originalSize = summary.getRetainedSizeInBytes(); - if (summary.getRetainedSizeInBytes() > domainSizeLimitInBytes) { - summary = summary.simplify(1); - } - if (summary.getRetainedSizeInBytes() > domainSizeLimitInBytes) { + long summarySize = summary.getRetainedSizeInBytes(); + if (summarySize > domainSizeLimitInBytes) { sizeLimitExceeded = true; allDomain = Domain.all(summary.getType()); - summaryDomainsRetainedSizeInBytes.addAndGet(-originalSize); + summaryDomainsRetainedSizeInBytes.addAndGet(-summarySize); } else { - summaryDomainsRetainedSizeInBytes.addAndGet(summary.getRetainedSizeInBytes() - originalSize); summaryDomains.add(summary); } } @@ -828,6 +824,10 @@ private void unionSummaryDomainsIfNecessary(boolean force) } Domain union = union(domains); + // Avoid large unions with domains that exceed size limit + if ((summaryDomainsRetainedSizeInBytes.get() - domainsRetainedSizeInBytes + union.getRetainedSizeInBytes()) > domainSizeLimitInBytes) { + union = union.simplify(1); + } summaryDomainsRetainedSizeInBytes.addAndGet(union.getRetainedSizeInBytes() - domainsRetainedSizeInBytes); long currentSize = summaryDomainsRetainedSizeInBytes.get(); verify(currentSize >= 0, "currentSize is expected to be greater than or equal to zero: %s", currentSize); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalDynamicFilterConsumer.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalDynamicFilterConsumer.java index aacc1691608c..6317dc7a3929 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalDynamicFilterConsumer.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalDynamicFilterConsumer.java @@ -112,16 +112,12 @@ public void addPartition(TupleDomain domain) TupleDomain summary = summaryDomains.poll(); // summary can be null as another concurrent summary compaction may be running if (summary != null) { - long originalSize = getRetainedSizeInBytes(summary); - if (originalSize > domainSizeLimitInBytes) { - summary = summary.simplify(1); - } - if (getRetainedSizeInBytes(summary) > domainSizeLimitInBytes) { - summaryDomainsRetainedSizeInBytes.addAndGet(-originalSize); + long summarySize = getRetainedSizeInBytes(summary); + if (summarySize > domainSizeLimitInBytes) { + summaryDomainsRetainedSizeInBytes.addAndGet(-summarySize); sizeLimitExceeded = true; } else { - summaryDomainsRetainedSizeInBytes.addAndGet(getRetainedSizeInBytes(summary) - originalSize); summaryDomains.add(summary); } } @@ -201,7 +197,13 @@ private void unionSummaryDomainsIfNecessary(boolean force) } TupleDomain union = columnWiseUnion(domains); - summaryDomainsRetainedSizeInBytes.addAndGet(getRetainedSizeInBytes(union) - domainsRetainedSizeInBytes); + long unionSize = getRetainedSizeInBytes(union); + // Avoid large unions with domains that exceed size limit + if ((summaryDomainsRetainedSizeInBytes.get() - domainsRetainedSizeInBytes + unionSize) > domainSizeLimitInBytes) { + union = union.simplify(1); + unionSize = getRetainedSizeInBytes(union); + } + summaryDomainsRetainedSizeInBytes.addAndGet(unionSize - domainsRetainedSizeInBytes); long currentSize = summaryDomainsRetainedSizeInBytes.get(); verify(currentSize >= 0, "currentSize is expected to be greater than or equal to zero: %s", currentSize); summaryDomains.add(union);