Skip to content

Commit 90da7dc

Browse files
kiszkcloud-fan
authored andcommitted
[SPARK-24452][SQL][CORE] Avoid possible overflow in int add or multiple
## What changes were proposed in this pull request? This PR fixes possible overflow in int add or multiply. In particular, their overflows in multiply are detected by [Spotbugs](https://spotbugs.github.io/) The following assignments may cause overflow in right hand side. As a result, the result may be negative. ``` long = int * int long = int + int ``` To avoid this problem, this PR performs cast from int to long in right hand side. ## How was this patch tested? Existing UTs. Author: Kazuaki Ishizaki <[email protected]> Closes #21481 from kiszk/SPARK-24452.
1 parent b5ccf0d commit 90da7dc

File tree

11 files changed

+73
-73
lines changed

11 files changed

+73
-73
lines changed

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
703703
// must be stored in the same memory page.
704704
// (8 byte key length) (key) (value) (8 byte pointer to next value)
705705
int uaoSize = UnsafeAlignedOffset.getUaoSize();
706-
final long recordLength = (2 * uaoSize) + klen + vlen + 8;
706+
final long recordLength = (2L * uaoSize) + klen + vlen + 8;
707707
if (currentPage == null || currentPage.size() - pageCursor < recordLength) {
708708
if (!acquireNewPage(recordLength + uaoSize)) {
709709
return false;

core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ private[deploy] class DriverRunner(
225225
// check if attempting another run
226226
keepTrying = supervise && exitCode != 0 && !killed
227227
if (keepTrying) {
228-
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
228+
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000L) {
229229
waitSeconds = 1
230230
}
231231
logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")

core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
9595
// the left side of max is >=1 whenever partsScanned >= 2
9696
numPartsToTry = Math.max(1,
9797
(1.5 * num * partsScanned / results.size).toInt - partsScanned)
98-
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
98+
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4L)
9999
}
100100
}
101101

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ private[spark] class BlockManager(
291291
case e: Exception if i < MAX_ATTEMPTS =>
292292
logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}"
293293
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
294-
Thread.sleep(SLEEP_TIME_SECS * 1000)
294+
Thread.sleep(SLEEP_TIME_SECS * 1000L)
295295
case NonFatal(e) =>
296296
throw new SparkException("Unable to register with external shuffle server due to : " +
297297
e.getMessage, e)

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public static long calculateSizeOfUnderlyingByteArray(long numFields, int elemen
8383
private long elementOffset;
8484

8585
private long getElementOffset(int ordinal, int elementSize) {
86-
return elementOffset + ordinal * elementSize;
86+
return elementOffset + ordinal * (long)elementSize;
8787
}
8888

8989
public Object getBaseObject() { return baseObject; }
@@ -414,46 +414,46 @@ public byte[] toByteArray() {
414414
public short[] toShortArray() {
415415
short[] values = new short[numElements];
416416
Platform.copyMemory(
417-
baseObject, elementOffset, values, Platform.SHORT_ARRAY_OFFSET, numElements * 2);
417+
baseObject, elementOffset, values, Platform.SHORT_ARRAY_OFFSET, numElements * 2L);
418418
return values;
419419
}
420420

421421
@Override
422422
public int[] toIntArray() {
423423
int[] values = new int[numElements];
424424
Platform.copyMemory(
425-
baseObject, elementOffset, values, Platform.INT_ARRAY_OFFSET, numElements * 4);
425+
baseObject, elementOffset, values, Platform.INT_ARRAY_OFFSET, numElements * 4L);
426426
return values;
427427
}
428428

429429
@Override
430430
public long[] toLongArray() {
431431
long[] values = new long[numElements];
432432
Platform.copyMemory(
433-
baseObject, elementOffset, values, Platform.LONG_ARRAY_OFFSET, numElements * 8);
433+
baseObject, elementOffset, values, Platform.LONG_ARRAY_OFFSET, numElements * 8L);
434434
return values;
435435
}
436436

437437
@Override
438438
public float[] toFloatArray() {
439439
float[] values = new float[numElements];
440440
Platform.copyMemory(
441-
baseObject, elementOffset, values, Platform.FLOAT_ARRAY_OFFSET, numElements * 4);
441+
baseObject, elementOffset, values, Platform.FLOAT_ARRAY_OFFSET, numElements * 4L);
442442
return values;
443443
}
444444

445445
@Override
446446
public double[] toDoubleArray() {
447447
double[] values = new double[numElements];
448448
Platform.copyMemory(
449-
baseObject, elementOffset, values, Platform.DOUBLE_ARRAY_OFFSET, numElements * 8);
449+
baseObject, elementOffset, values, Platform.DOUBLE_ARRAY_OFFSET, numElements * 8L);
450450
return values;
451451
}
452452

453453
private static UnsafeArrayData fromPrimitiveArray(
454454
Object arr, int offset, int length, int elementSize) {
455455
final long headerInBytes = calculateHeaderPortionInBytes(length);
456-
final long valueRegionInBytes = elementSize * length;
456+
final long valueRegionInBytes = (long)elementSize * length;
457457
final long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8;
458458
if (totalSizeInLongs > Integer.MAX_VALUE / 8) {
459459
throw new UnsupportedOperationException("Cannot convert this array to unsafe format as " +

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueB
4141
@Override
4242
public UnsafeRow appendRow(Object kbase, long koff, int klen,
4343
Object vbase, long voff, int vlen) {
44-
final long recordLength = 8 + klen + vlen + 8;
44+
final long recordLength = 8L + klen + vlen + 8;
4545
// if run out of max supported rows or page size, return null
4646
if (numRows >= capacity || page == null || page.size() - pageCursor < recordLength) {
4747
return null;

0 commit comments

Comments
 (0)