@@ -101,6 +101,10 @@ public class InternalResourceGroup
101101 @ GuardedBy ("root" )
102102 private long cpuQuotaGenerationMillisPerSecond = Long .MAX_VALUE ;
103103 @ GuardedBy ("root" )
104+ private long hardPhysicalDataScanLimitBytes = Long .MAX_VALUE ;
105+ @ GuardedBy ("root" )
106+ private long physicalDataScanQuotaGenerationBytesPerSecond = Long .MAX_VALUE ;
107+ @ GuardedBy ("root" )
104108 private int schedulingWeight = DEFAULT_WEIGHT ;
105109 @ GuardedBy ("root" )
106110 private SchedulingPolicy schedulingPolicy = FAIR ;
@@ -128,9 +132,9 @@ public class InternalResourceGroup
128132 private int descendantRunningQueries ;
129133 @ GuardedBy ("root" )
130134 private int descendantQueuedQueries ;
131- // CPU and memory usage is cached because it changes very rapidly while queries are running, and would be expensive to track continuously
135+ // CPU, memory and physical data input usage is cached because it changes very rapidly while queries are running, and would be expensive to track continuously
132136 @ GuardedBy ("root" )
133- private ResourceUsage cachedResourceUsage = new ResourceUsage (0 , 0 );
137+ private ResourceUsage cachedResourceUsage = new ResourceUsage (0 , 0 , 0 );
134138 @ GuardedBy ("root" )
135139 private long lastStartMillis ;
136140 private final CounterStat timeBetweenStartsSec = new CounterStat ();
@@ -168,9 +172,11 @@ public ResourceGroupInfo getFullInfo()
168172 succinctBytes (softMemoryLimitBytes ),
169173 softConcurrencyLimit ,
170174 hardConcurrencyLimit ,
175+ succinctBytes (hardPhysicalDataScanLimitBytes ),
171176 maxQueuedQueries ,
172177 succinctBytes (cachedResourceUsage .getMemoryUsageBytes ()),
173178 succinctDuration (cachedResourceUsage .getCpuUsageMillis (), MILLISECONDS ),
179+ succinctBytes (cachedResourceUsage .getPhysicalInputDataUsageBytes ()),
174180 getQueuedQueries (),
175181 getRunningQueries (),
176182 eligibleSubGroups .size (),
@@ -193,9 +199,11 @@ public ResourceGroupInfo getInfo()
193199 succinctBytes (softMemoryLimitBytes ),
194200 softConcurrencyLimit ,
195201 hardConcurrencyLimit ,
202+ succinctBytes (hardPhysicalDataScanLimitBytes ),
196203 maxQueuedQueries ,
197204 succinctBytes (cachedResourceUsage .getMemoryUsageBytes ()),
198205 succinctDuration (cachedResourceUsage .getCpuUsageMillis (), MILLISECONDS ),
206+ succinctBytes (cachedResourceUsage .getPhysicalInputDataUsageBytes ()),
199207 getQueuedQueries (),
200208 getRunningQueries (),
201209 eligibleSubGroups .size (),
@@ -218,9 +226,11 @@ private ResourceGroupInfo getSummaryInfo()
218226 succinctBytes (softMemoryLimitBytes ),
219227 softConcurrencyLimit ,
220228 hardConcurrencyLimit ,
229+ succinctBytes (hardPhysicalDataScanLimitBytes ),
221230 maxQueuedQueries ,
222231 succinctBytes (cachedResourceUsage .getMemoryUsageBytes ()),
223232 succinctDuration (cachedResourceUsage .getCpuUsageMillis (), MILLISECONDS ),
233+ succinctBytes (cachedResourceUsage .getPhysicalInputDataUsageBytes ()),
224234 getQueuedQueries (),
225235 getRunningQueries (),
226236 eligibleSubGroups .size (),
@@ -342,6 +352,12 @@ public long getMemoryUsageBytes()
342352 return getResourceUsageSnapshot ().getMemoryUsageBytes ();
343353 }
344354
355+ @ Managed
356+ public long getPhysicalInputDataUsageBytes ()
357+ {
358+ return getResourceUsageSnapshot ().getPhysicalInputDataUsageBytes ();
359+ }
360+
345361 @ Managed
346362 @ Override
347363 public long getSoftMemoryLimitBytes ()
@@ -443,6 +459,45 @@ public void setCpuQuotaGenerationMillisPerSecond(long rate)
443459 }
444460 }
445461
462+ @ Managed
463+ @ Override
464+ public long getHardPhysicalDataScanLimitBytes ()
465+ {
466+ synchronized (root ) {
467+ return hardPhysicalDataScanLimitBytes ;
468+ }
469+ }
470+
471+ @ Override
472+ public void setHardPhysicalDataScanLimitBytes (long limit )
473+ {
474+ synchronized (root ) {
475+ boolean oldCanRun = canRunMore ();
476+ this .hardPhysicalDataScanLimitBytes = limit ;
477+ if (canRunMore () != oldCanRun ) {
478+ updateEligibility ();
479+ }
480+ }
481+ }
482+
483+ @ Managed
484+ @ Override
485+ public long getPhysicalDataScanQuotaGenerationBytesPerSecond ()
486+ {
487+ synchronized (root ) {
488+ return physicalDataScanQuotaGenerationBytesPerSecond ;
489+ }
490+ }
491+
492+ @ Override
493+ public void setPhysicalDataScanQuotaGenerationBytesPerSecond (long rate )
494+ {
495+ checkArgument (rate > 0 , "Physical data scan quota generation must be positive" );
496+ synchronized (root ) {
497+ physicalDataScanQuotaGenerationBytesPerSecond = rate ;
498+ }
499+ }
500+
446501 @ Managed
447502 @ Override
448503 public int getSoftConcurrencyLimit ()
@@ -731,7 +786,7 @@ private void startInBackground(ManagedQueryExecution query)
731786 {
732787 checkState (Thread .holdsLock (root ), "Must hold lock to start a query" );
733788 synchronized (root ) {
734- runningQueries .put (query , new ResourceUsage (0 , 0 ));
789+ runningQueries .put (query , new ResourceUsage (0 , 0 , 0 ));
735790 InternalResourceGroup group = this ;
736791 group .getStartedQueries ().update (1 );
737792 while (group .parent .isPresent ()) {
@@ -757,11 +812,11 @@ public void updateGroupsAndProcessQueuedQueries()
757812 }
758813 }
759814
760- public void generateCpuQuota (long elapsedSeconds )
815+ public void generateQuotas (long elapsedSeconds )
761816 {
762817 synchronized (root ) {
763818 if (elapsedSeconds > 0 ) {
764- internalGenerateCpuQuota (elapsedSeconds );
819+ internalGenerateQuotas (elapsedSeconds );
765820 }
766821 }
767822 }
@@ -784,12 +839,12 @@ private void queryFinished(ManagedQueryExecution query)
784839
785840 // The query is present in runningQueries
786841 if (lastUsage != null ) {
787- // CPU is measured cumulatively (i.e. total CPU used until this moment by the query). Memory is measured
788- // instantaneously (how much memory the query is using at this moment). At query completion, memory usage
789- // drops to zero.
842+ // CPU and data scan are measured cumulatively (i.e. total CPU & physical input data used until this moment by the query).
843+ // Memory is measured instantaneously (how much memory the query is using at this moment). At query completion, memory usage drops to zero.
790844 ResourceUsage finalUsage = new ResourceUsage (
791845 query .getTotalCpuTime ().toMillis (),
792- 0L );
846+ 0L ,
847+ query .getBasicQueryInfo ().getQueryStats ().getPhysicalInputDataSize ().toBytes ());
793848 ResourceUsage delta = finalUsage .subtract (lastUsage );
794849
795850 runningQueries .remove (query );
@@ -827,15 +882,16 @@ private ResourceUsage updateResourceUsageAndGetDelta()
827882 {
828883 checkState (Thread .holdsLock (root ), "Must hold lock to refresh stats" );
829884 synchronized (root ) {
830- ResourceUsage groupUsageDelta = new ResourceUsage (0 , 0 );
885+ ResourceUsage groupUsageDelta = new ResourceUsage (0 , 0 , 0 );
831886
832887 for (Map .Entry <ManagedQueryExecution , ResourceUsage > entry : runningQueries .entrySet ()) {
833888 ManagedQueryExecution query = entry .getKey ();
834889 ResourceUsage oldResourceUsage = entry .getValue ();
835890
836891 ResourceUsage newResourceUsage = new ResourceUsage (
837892 query .getTotalCpuTime ().toMillis (),
838- query .getTotalMemoryReservation ().toBytes ());
893+ query .getTotalMemoryReservation ().toBytes (),
894+ query .getBasicQueryInfo ().getQueryStats ().getPhysicalInputDataSize ().toBytes ());
839895
840896 // Compute delta and update usage
841897 ResourceUsage queryUsageDelta = newResourceUsage .subtract (oldResourceUsage );
@@ -850,7 +906,7 @@ private ResourceUsage updateResourceUsageAndGetDelta()
850906 groupUsageDelta = groupUsageDelta .add (subGroupUsageDelta );
851907 cachedResourceUsage = cachedResourceUsage .add (subGroupUsageDelta );
852908
853- if (!subGroupUsageDelta .equals (new ResourceUsage (0 , 0 ))) {
909+ if (!subGroupUsageDelta .equals (new ResourceUsage (0 , 0 , 0 ))) {
854910 subGroup .updateEligibility ();
855911 }
856912 }
@@ -859,32 +915,40 @@ private ResourceUsage updateResourceUsageAndGetDelta()
859915 }
860916 }
861917
862- private void internalGenerateCpuQuota (long elapsedSeconds )
918+ private void internalGenerateQuotas (long elapsedSeconds )
863919 {
864- checkState (Thread .holdsLock (root ), "Must hold lock to generate cpu quota " );
920+ checkState (Thread .holdsLock (root ), "Must hold lock to generate quotas " );
865921 synchronized (root ) {
866- long newQuota = saturatedMultiply (elapsedSeconds , cpuQuotaGenerationMillisPerSecond );
867-
868- long oldUsageMillis = cachedResourceUsage .getCpuUsageMillis ();
869- long newCpuUsageMillis = saturatedSubtract (oldUsageMillis , newQuota );
922+ long oldCpuUsageMillis = cachedResourceUsage .getCpuUsageMillis ();
923+ long oldDataScanBytes = cachedResourceUsage .getPhysicalInputDataUsageBytes ();
870924
871- if (newCpuUsageMillis < 0 || newCpuUsageMillis == Long .MAX_VALUE ) {
872- newCpuUsageMillis = 0 ;
873- }
874-
875- cachedResourceUsage = new ResourceUsage (newCpuUsageMillis , cachedResourceUsage .getMemoryUsageBytes ());
925+ long newCpuUsageMillis = computeNewUsage (oldCpuUsageMillis , elapsedSeconds , cpuQuotaGenerationMillisPerSecond );
926+ long newDataScanBytes = computeNewUsage (oldDataScanBytes , elapsedSeconds , physicalDataScanQuotaGenerationBytesPerSecond );
927+ cachedResourceUsage = new ResourceUsage (newCpuUsageMillis , cachedResourceUsage .getMemoryUsageBytes (), newDataScanBytes );
876928
877- if ((newCpuUsageMillis < hardCpuLimitMillis && oldUsageMillis >= hardCpuLimitMillis ) ||
878- (newCpuUsageMillis < softCpuLimitMillis && oldUsageMillis >= softCpuLimitMillis )) {
929+ if ((newCpuUsageMillis < hardCpuLimitMillis && oldCpuUsageMillis >= hardCpuLimitMillis ) ||
930+ (newCpuUsageMillis < softCpuLimitMillis && oldCpuUsageMillis >= softCpuLimitMillis ) ||
931+ (newDataScanBytes < hardPhysicalDataScanLimitBytes && oldDataScanBytes >= hardPhysicalDataScanLimitBytes )) {
879932 updateEligibility ();
880933 }
881934
882935 for (InternalResourceGroup group : subGroups .values ()) {
883- group .internalGenerateCpuQuota (elapsedSeconds );
936+ group .internalGenerateQuotas (elapsedSeconds );
884937 }
885938 }
886939 }
887940
941+ private static long computeNewUsage (long currentUsage , long elapsedSeconds , long generationRate )
942+ {
943+ long quotaToRegenerate = saturatedMultiply (elapsedSeconds , generationRate );
944+ long newUsage = saturatedSubtract (currentUsage , quotaToRegenerate );
945+
946+ if (newUsage < 0 || newUsage == Long .MAX_VALUE ) {
947+ newUsage = 0 ;
948+ }
949+ return newUsage ;
950+ }
951+
888952 private boolean internalStartNext ()
889953 {
890954 checkState (Thread .holdsLock (root ), "Must hold lock to find next query" );
@@ -996,8 +1060,8 @@ private boolean canRunMore()
9961060 synchronized (root ) {
9971061 long cpuUsageMillis = cachedResourceUsage .getCpuUsageMillis ();
9981062 long memoryUsageBytes = cachedResourceUsage .getMemoryUsageBytes ();
999-
1000- if ((cpuUsageMillis >= hardCpuLimitMillis ) || (memoryUsageBytes > softMemoryLimitBytes )) {
1063+ long physicalInputDataUsageBytes = cachedResourceUsage . getPhysicalInputDataUsageBytes ();
1064+ if ((cpuUsageMillis >= hardCpuLimitMillis ) || (memoryUsageBytes > softMemoryLimitBytes ) || ( physicalInputDataUsageBytes >= hardPhysicalDataScanLimitBytes ) ) {
10011065 return false ;
10021066 }
10031067
0 commit comments