From 492c1990a67665fccc7633fbcebe3765d480aee4 Mon Sep 17 00:00:00 2001 From: Prashant Singh Date: Tue, 25 Apr 2023 09:04:32 -0700 Subject: [PATCH] Spark: Refactor SparkReadConf to use primitive type --- .../java/org/apache/iceberg/spark/SparkReadConf.java | 2 +- .../iceberg/spark/source/SparkMicroBatchStream.java | 8 ++++---- .../java/org/apache/iceberg/spark/SparkReadConf.java | 2 +- .../iceberg/spark/source/SparkMicroBatchStream.java | 8 ++++---- .../java/org/apache/iceberg/spark/SparkReadConf.java | 6 +++--- .../iceberg/spark/source/SparkMicroBatchStream.java | 12 ++++++------ .../java/org/apache/iceberg/spark/SparkReadConf.java | 2 +- .../iceberg/spark/source/SparkMicroBatchStream.java | 8 ++++---- 8 files changed, 24 insertions(+), 24 deletions(-) diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 2ade02bbd023..d5833a9d07fd 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -201,7 +201,7 @@ public boolean handleTimestampWithoutZone() { .parse(); } - public Long streamFromTimestamp() { + public long streamFromTimestamp() { return confParser .longConf() .option(SparkReadOptions.STREAM_FROM_TIMESTAMP) diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 1ed3b64f0297..ebac79ebcb17 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -69,14 +69,14 @@ public class SparkMicroBatchStream implements MicroBatchStream { private final boolean caseSensitive; private final String expectedSchema; private final Broadcast tableBroadcast; - private final Long splitSize; - private final Integer splitLookback; - private final Long splitOpenFileCost; + private final long splitSize; + private final int splitLookback; + private final long splitOpenFileCost; private final boolean localityPreferred; private final StreamingOffset initialOffset; private final boolean skipDelete; private final boolean skipOverwrite; - private final Long fromTimestamp; + private final long fromTimestamp; SparkMicroBatchStream( JavaSparkContext sparkContext, diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 8fbd11c9d997..fda807431a31 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -201,7 +201,7 @@ public boolean handleTimestampWithoutZone() { .parse(); } - public Long streamFromTimestamp() { + public long streamFromTimestamp() { return confParser .longConf() .option(SparkReadOptions.STREAM_FROM_TIMESTAMP) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 816e3d2bf8e5..c4de2ddfe6ea 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -70,14 +70,14 @@ public class SparkMicroBatchStream implements MicroBatchStream { private final boolean caseSensitive; private final String expectedSchema; private final Broadcast
tableBroadcast; - private final Long splitSize; - private final Integer splitLookback; - private final Long splitOpenFileCost; + private final long splitSize; + private final int splitLookback; + private final long splitOpenFileCost; private final boolean localityPreferred; private final StreamingOffset initialOffset; private final boolean skipDelete; private final boolean skipOverwrite; - private final Long fromTimestamp; + private final long fromTimestamp; SparkMicroBatchStream( JavaSparkContext sparkContext, diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index dbd2613dded7..0f24844414fe 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -239,7 +239,7 @@ public boolean handleTimestampWithoutZone() { .parse(); } - public Long streamFromTimestamp() { + public long streamFromTimestamp() { return confParser .longConf() .option(SparkReadOptions.STREAM_FROM_TIMESTAMP) @@ -255,7 +255,7 @@ public Long endTimestamp() { return confParser.longConf().option(SparkReadOptions.END_TIMESTAMP).parseOptional(); } - public Integer maxFilesPerMicroBatch() { + public int maxFilesPerMicroBatch() { return confParser .intConf() .option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH) @@ -263,7 +263,7 @@ public Integer maxFilesPerMicroBatch() { .parse(); } - public Integer maxRecordsPerMicroBatch() { + public int maxRecordsPerMicroBatch() { return confParser .intConf() .option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index e1a8ef59129c..4019fedcbbfa 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -77,16 +77,16 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio private final boolean caseSensitive; private final String expectedSchema; private final Broadcast
tableBroadcast; - private final Long splitSize; - private final Integer splitLookback; - private final Long splitOpenFileCost; + private final long splitSize; + private final int splitLookback; + private final long splitOpenFileCost; private final boolean localityPreferred; private final StreamingOffset initialOffset; private final boolean skipDelete; private final boolean skipOverwrite; - private final Long fromTimestamp; - private final Integer maxFilesPerMicroBatch; - private final Integer maxRecordsPerMicroBatch; + private final long fromTimestamp; + private final int maxFilesPerMicroBatch; + private final int maxRecordsPerMicroBatch; SparkMicroBatchStream( JavaSparkContext sparkContext, diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 1c1182c4da60..b91270f166ec 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -239,7 +239,7 @@ public boolean handleTimestampWithoutZone() { .parse(); } - public Long streamFromTimestamp() { + public long streamFromTimestamp() { return confParser .longConf() .option(SparkReadOptions.STREAM_FROM_TIMESTAMP) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 6e03dd69a850..728acbe6463e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -72,14 +72,14 @@ public class SparkMicroBatchStream implements MicroBatchStream { private final boolean caseSensitive; private final String expectedSchema; private final Broadcast
tableBroadcast; - private final Long splitSize; - private final Integer splitLookback; - private final Long splitOpenFileCost; + private final long splitSize; + private final int splitLookback; + private final long splitOpenFileCost; private final boolean localityPreferred; private final StreamingOffset initialOffset; private final boolean skipDelete; private final boolean skipOverwrite; - private final Long fromTimestamp; + private final long fromTimestamp; SparkMicroBatchStream( JavaSparkContext sparkContext,