diff --git a/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java b/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java index 653c27313ec3..c6984e2fe8cd 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java +++ b/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java @@ -202,11 +202,6 @@ public Type primitive(Type.PrimitiveType primitive) { "Cannot project decimal with incompatible precision: %s < %s", requestedDecimal.precision(), decimal.precision()); break; - case TIMESTAMP: - Types.TimestampType timestamp = (Types.TimestampType) primitive; - Preconditions.checkArgument(timestamp.shouldAdjustToUTC(), - "Cannot project timestamp (without time zone) as timestamptz (with time zone)"); - break; default: } diff --git a/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java b/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java index b9e9dfade792..6a8be60eb078 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java +++ b/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java @@ -104,12 +104,7 @@ public DataType primitive(Type.PrimitiveType primitive) { throw new UnsupportedOperationException( "Spark does not support time fields"); case TIMESTAMP: - Types.TimestampType timestamp = (Types.TimestampType) primitive; - if (timestamp.shouldAdjustToUTC()) { - return TimestampType$.MODULE$; - } - throw new UnsupportedOperationException( - "Spark does not support timestamp without time zone fields"); + return TimestampType$.MODULE$; case STRING: return StringType$.MODULE$; case UUID: diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java index 0c4598a209e8..3112c9304bbc 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java @@ -24,6 +24,7 @@ import java.sql.Timestamp; import java.time.Instant; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; @@ -122,13 +123,19 @@ private static void assertEqualsSafe(Type type, Object expected, Object actual) Assert.assertEquals("ISO-8601 date should be equal", expected.toString(), actual.toString()); break; case TIMESTAMP: - Assert.assertTrue("Should expect an OffsetDateTime", expected instanceof OffsetDateTime); Assert.assertTrue("Should be a Timestamp", actual instanceof Timestamp); Timestamp ts = (Timestamp) actual; // milliseconds from nanos has already been added by getTime OffsetDateTime actualTs = EPOCH.plusNanos( (ts.getTime() * 1_000_000) + (ts.getNanos() % 1_000_000)); - Assert.assertEquals("Timestamp should be equal", expected, actualTs); + Types.TimestampType timestampType = (Types.TimestampType) type; + if (timestampType.shouldAdjustToUTC()) { + Assert.assertTrue("Should expect an OffsetDateTime", expected instanceof OffsetDateTime); + Assert.assertEquals("Timestamp should be equal", expected, actualTs); + } else { + Assert.assertTrue("Should expect an LocalDateTime", expected instanceof LocalDateTime); + Assert.assertEquals("Timestamp should be equal", expected, actualTs.toLocalDateTime()); + } break; case STRING: Assert.assertTrue("Should be a String", actual instanceof String); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java new file mode 100644 index 000000000000..9b8e428e7430 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.File; +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Locale; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.data.GenericsHelpers; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.Files.localOutput; + +@RunWith(Parameterized.class) +public abstract class TestTimestampWithoutZone { + private static final Configuration CONF = new Configuration(); + private static final HadoopTables TABLES = new HadoopTables(CONF); + + private static final Schema SCHEMA = new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "ts", Types.TimestampType.withoutZone()), + Types.NestedField.optional(3, "data", Types.StringType.get()) + ); + + private static SparkSession spark = null; + + @BeforeClass + public static void startSpark() { + TestTimestampWithoutZone.spark = SparkSession.builder().master("local[2]").getOrCreate(); + } + + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestTimestampWithoutZone.spark; + TestTimestampWithoutZone.spark = null; + currentSpark.stop(); + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private final String format; + private final boolean vectorized; + + @Parameterized.Parameters(name = "format = {0}, vectorized = {1}") + public static Object[][] parameters() { + return new Object[][] { + { "parquet", false }, + { "parquet", true }, + { "avro", false }, + { "orc", false } + }; + } + + public TestTimestampWithoutZone(String format, boolean vectorized) { + this.format = format; + this.vectorized = vectorized; + } + + private File parent = null; + private File unpartitioned = null; + private List records = null; + + @Before + public void writeUnpartitionedTable() throws IOException { + this.parent = temp.newFolder("TestTimestampWithoutZone"); + this.unpartitioned = new File(parent, "unpartitioned"); + File dataFolder = new File(unpartitioned, "data"); + Assert.assertTrue("Mkdir should succeed", dataFolder.mkdirs()); + + Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), unpartitioned.toString()); + Schema tableSchema = table.schema(); // use the table schema because ids are reassigned + + FileFormat fileFormat = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH)); + + File testFile = new File(dataFolder, fileFormat.addExtension(UUID.randomUUID().toString())); + + // create records using the table's schema + this.records = testRecords(tableSchema); + + try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender( + localOutput(testFile), fileFormat)) { + writer.addAll(records); + } + + DataFile file = DataFiles.builder(PartitionSpec.unpartitioned()) + .withRecordCount(records.size()) + .withFileSizeInBytes(testFile.length()) + .withPath(testFile.toString()) + .build(); + + table.newAppend().appendFile(file).commit(); + } + + @Test + public void testUnpartitionedTimestampWithoutZone() { + assertEqualsSafe(SCHEMA.asStruct(), records, read(unpartitioned.toString(), vectorized)); + } + + @Test + public void testUnpartitionedTimestampWithoutZoneProjection() { + Schema projection = SCHEMA.select("id", "ts"); + assertEqualsSafe(projection.asStruct(), + records.stream().map(r -> projectFlat(projection, r)).collect(Collectors.toList()), + read(unpartitioned.toString(), vectorized, "id", "ts")); + } + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void testUnpartitionedTimestampWithoutZoneError() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Spark does not support timestamp without time zone fields"); + + spark.read().format("iceberg") + .option("vectorization-enabled", String.valueOf(vectorized)) + .option("read-timestamp-without-zone", "false") + .load(unpartitioned.toString()) + .collectAsList(); + } + + private static Record projectFlat(Schema projection, Record record) { + Record result = GenericRecord.create(projection); + List fields = projection.asStruct().fields(); + for (int i = 0; i < fields.size(); i += 1) { + Types.NestedField field = fields.get(i); + result.set(i, record.getField(field.name())); + } + return result; + } + + public static void assertEqualsSafe(Types.StructType struct, + List expected, List actual) { + Assert.assertEquals("Number of results should match expected", expected.size(), actual.size()); + for (int i = 0; i < expected.size(); i += 1) { + GenericsHelpers.assertEqualsSafe(struct, expected.get(i), actual.get(i)); + } + } + + private List testRecords(Schema schema) { + return Lists.newArrayList( + record(schema, 0L, parseToLocal("2017-12-22T09:20:44.294658"), "junction"), + record(schema, 1L, parseToLocal("2017-12-22T07:15:34.582910"), "alligator"), + record(schema, 2L, parseToLocal("2017-12-22T06:02:09.243857"), "forrest"), + record(schema, 3L, parseToLocal("2017-12-22T03:10:11.134509"), "clapping"), + record(schema, 4L, parseToLocal("2017-12-22T00:34:00.184671"), "brush"), + record(schema, 5L, parseToLocal("2017-12-21T22:20:08.935889"), "trap"), + record(schema, 6L, parseToLocal("2017-12-21T21:55:30.589712"), "element"), + record(schema, 7L, parseToLocal("2017-12-21T17:31:14.532797"), "limited"), + record(schema, 8L, parseToLocal("2017-12-21T15:21:51.237521"), "global"), + record(schema, 9L, parseToLocal("2017-12-21T15:02:15.230570"), "goldfish") + ); + } + + private static List read(String table, boolean vectorized) { + return read(table, vectorized, "*"); + } + + private static List read(String table, boolean vectorized, String select0, String... selectN) { + Dataset dataset = spark.read().format("iceberg") + .option("vectorization-enabled", String.valueOf(vectorized)) + .option("read-timestamp-without-zone", "true") + .load(table) + .select(select0, selectN); + return dataset.collectAsList(); + } + + private static LocalDateTime parseToLocal(String timestamp) { + return LocalDateTime.parse(timestamp); + } + + private static Record record(Schema schema, Object... values) { + Record rec = GenericRecord.create(schema); + for (int i = 0; i < values.length; i += 1) { + rec.set(i, values[i]); + } + return rec; + } +} diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index 4b0723e7ee30..4a764818485f 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -43,14 +43,19 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.hadoop.Util; +import org.apache.iceberg.hive.legacy.LegacyHiveTable; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.orc.OrcRowFilterUtils; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkFilters; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.broadcast.Broadcast; @@ -70,6 +75,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Option; import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; @@ -97,6 +103,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus private final boolean localityPreferred; private final boolean batchReadsEnabled; private final int batchSize; + private final boolean readTimestampWithoutZone; // lazy variables private Schema schema = null; @@ -136,7 +143,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus if (io.getValue() instanceof HadoopFileIO) { String fsscheme = "no_exist"; try { - Configuration conf = SparkSession.active().sessionState().newHadoopConf(); + Configuration conf = new Configuration(activeSparkSession().sessionState().newHadoopConf()); // merge hadoop config set on table mergeIcebergHadoopConfs(conf, table.properties()); // merge hadoop config passed as options and overwrite the one on table @@ -170,6 +177,13 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus this.batchSize = options.get(SparkReadOptions.VECTORIZATION_BATCH_SIZE).map(Integer::parseInt).orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), TableProperties.PARQUET_BATCH_SIZE, TableProperties.PARQUET_BATCH_SIZE_DEFAULT)); + // Allow reading timestamp without time zone as timestamp with time zone. Generally, this is not safe as timestamp + // without time zone is supposed to represent wall clock time semantics, i.e. no matter the reader/writer timezone + // 3PM should always be read as 3PM, but timestamp with time zone represents instant semantics, i.e the timestamp + // is adjusted so that the corresponding time in the reader timezone is displayed. + // When set to false (default), we throw an exception at runtime + // "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields + this.readTimestampWithoutZone = options.get("read-timestamp-without-zone").map(Boolean::parseBoolean).orElse(false); } private Schema lazySchema() { @@ -193,6 +207,8 @@ private Expression filterExpression() { private StructType lazyType() { if (type == null) { + Preconditions.checkArgument(readTimestampWithoutZone || !hasTimestampWithoutZone(lazySchema()), + "Spark does not support timestamp without time zone fields"); this.type = SparkSchemaUtil.convert(lazySchema()); } return type; @@ -290,16 +306,18 @@ public void pruneColumns(StructType newRequestedSchema) { @Override public Statistics estimateStatistics() { - // its a fresh table, no data - if (table.currentSnapshot() == null) { - return new Stats(0L, 0L); - } + if (!(table instanceof LegacyHiveTable)) { + // its a fresh table, no data + if (table.currentSnapshot() == null) { + return new Stats(0L, 0L); + } - // estimate stats using snapshot summary only for partitioned tables (metadata tables are unpartitioned) - if (!table.spec().isUnpartitioned() && filterExpression() == Expressions.alwaysTrue()) { - long totalRecords = PropertyUtil.propertyAsLong(table.currentSnapshot().summary(), - SnapshotSummary.TOTAL_RECORDS_PROP, Long.MAX_VALUE); - return new Stats(SparkSchemaUtil.estimateSize(lazyType(), totalRecords), totalRecords); + // estimate stats using snapshot summary only for partitioned tables (metadata tables are unpartitioned) + if (!table.spec().isUnpartitioned() && filterExpression() == Expressions.alwaysTrue()) { + long totalRecords = PropertyUtil.propertyAsLong(table.currentSnapshot().summary(), + SnapshotSummary.TOTAL_RECORDS_PROP, Long.MAX_VALUE); + return new Stats(SparkSchemaUtil.estimateSize(lazyType(), totalRecords), totalRecords); + } } long sizeInBytes = 0L; @@ -332,18 +350,32 @@ public boolean enableBatchRead() { .allMatch(fileScanTask -> fileScanTask.file().format().equals( FileFormat.ORC))); + boolean hasNoRowFilters = + tasks().stream() + .allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files() + .stream() + .allMatch(fileScanTask -> OrcRowFilterUtils.rowFilterFromTask(fileScanTask) == null)); + boolean atLeastOneColumn = lazySchema().columns().size() > 0; boolean onlyPrimitives = lazySchema().columns().stream().allMatch(c -> c.type().isPrimitiveType()); + boolean hasTimestampWithoutZone = hasTimestampWithoutZone(lazySchema()); + boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes); - this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks || - (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); + this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && ((allOrcFileScanTasks && hasNoRowFilters) || + (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives && !hasTimestampWithoutZone)); } return readUsingBatch; } + private static boolean hasTimestampWithoutZone(Schema schema) { + return TypeUtil.find(schema, t -> + t.typeId().equals(Type.TypeID.TIMESTAMP) && !((Types.TimestampType) t).shouldAdjustToUTC() + ) != null; + } + private static void mergeIcebergHadoopConfs( Configuration baseConf, Map options) { options.keySet().stream() @@ -470,11 +502,25 @@ private String[] getPreferredLocations() { return new String[0]; } - Configuration conf = SparkSession.active().sparkContext().hadoopConfiguration(); + Configuration conf = activeSparkSession().sparkContext().hadoopConfiguration(); return Util.blockLocations(task, conf); } } + private static SparkSession activeSparkSession() { + final Option activeSession = SparkSession.getActiveSession(); + if (activeSession.isDefined()) { + return activeSession.get(); + } else { + final Option defaultSession = SparkSession.getDefaultSession(); + if (defaultSession.isDefined()) { + return defaultSession.get(); + } else { + throw new IllegalStateException("No active spark session found"); + } + } + } + private interface ReaderFactory extends Serializable { InputPartitionReader create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO io, diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone24.java new file mode 100644 index 000000000000..84c7d1aeb733 --- /dev/null +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone24.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +public class TestTimestampWithoutZone24 extends TestTimestampWithoutZone { + public TestTimestampWithoutZone24(String format, boolean vectorized) { + super(format, vectorized); + } +} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index efe94459410c..ff3a4460b9c7 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -37,8 +37,16 @@ import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.orc.OrcRowFilterUtils; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.broadcast.Broadcast; @@ -59,6 +67,7 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { private static final Logger LOG = LoggerFactory.getLogger(SparkBatchScan.class); + private final Table table; private final boolean caseSensitive; private final boolean localityPreferred; @@ -68,6 +77,7 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { private final Broadcast encryptionManager; private final boolean batchReadsEnabled; private final int batchSize; + private final boolean readTimestampWithoutZone; // lazy variables private StructType readSchema = null; @@ -84,6 +94,13 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { this.localityPreferred = Spark3Util.isLocalityEnabled(io.value(), table.location(), options); this.batchReadsEnabled = Spark3Util.isVectorizationEnabled(table.properties(), options); this.batchSize = Spark3Util.batchSize(table.properties(), options); + // Allow reading timestamp without time zone as timestamp with time zone. Generally, this is not safe as timestamp + // without time zone is supposed to represent wall clock time semantics, i.e. no matter the reader/writer timezone + // 3PM should always be read as 3PM, but timestamp with time zone represents instant semantics, i.e the timestamp + // is adjusted so that the corresponding time in the reader timezone is displayed. + // When set to false (default), we throw an exception at runtime + // "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields + this.readTimestampWithoutZone = options.getBoolean("read-timestamp-without-zone", false); } protected Table table() { @@ -112,6 +129,8 @@ public Batch toBatch() { @Override public StructType readSchema() { if (readSchema == null) { + Preconditions.checkArgument(readTimestampWithoutZone || !hasTimestampWithoutZone(expectedSchema), + "Spark does not support timestamp without time zone fields"); this.readSchema = SparkSchemaUtil.convert(expectedSchema); } return readSchema; @@ -150,24 +169,39 @@ public PartitionReaderFactory createReaderFactory() { .allMatch(fileScanTask -> fileScanTask.file().format().equals( FileFormat.ORC))); + boolean hasNoRowFilters = + tasks().stream() + .allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files() + .stream() + .allMatch(fileScanTask -> OrcRowFilterUtils.rowFilterFromTask(fileScanTask) == null)); + boolean atLeastOneColumn = expectedSchema.columns().size() > 0; boolean onlyPrimitives = expectedSchema.columns().stream().allMatch(c -> c.type().isPrimitiveType()); boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes); - boolean readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks || - (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); + boolean hasTimestampWithoutZone = hasTimestampWithoutZone(expectedSchema); + + boolean readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && ((allOrcFileScanTasks && hasNoRowFilters) || + (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives && !hasTimestampWithoutZone)); return new ReaderFactory(readUsingBatch ? batchSize : 0); } + private static boolean hasTimestampWithoutZone(Schema schema) { + return TypeUtil.find(schema, t -> + t.typeId().equals(Type.TypeID.TIMESTAMP) && !((Types.TimestampType) t).shouldAdjustToUTC() + ) != null; + } + @Override public Statistics estimateStatistics() { - // its a fresh table, no data - if (table.currentSnapshot() == null) { - return new Stats(0L, 0L); - } + if (!(table instanceof LegacyHiveTable)) { + // its a fresh table, no data + if (table.currentSnapshot() == null) { + return new Stats(0L, 0L); + } // estimate stats using snapshot summary only for partitioned tables (metadata tables are unpartitioned) if (!table.spec().isUnpartitioned() && filterExpressions.isEmpty()) { @@ -178,6 +212,16 @@ public Statistics estimateStatistics() { return new Stats( SparkSchemaUtil.estimateSize(SparkSchemaUtil.convert(projectedSchema), totalRecords), totalRecords); + // estimate stats using snapshot summary only for partitioned tables (metadata tables are unpartitioned) + if (!table.spec().isUnpartitioned() && (filterExpressions == null || filterExpressions.isEmpty())) { + LOG.debug("using table metadata to estimate table statistics"); + long totalRecords = PropertyUtil.propertyAsLong(table.currentSnapshot().summary(), + SnapshotSummary.TOTAL_RECORDS_PROP, Long.MAX_VALUE); + Schema projectedSchema = expectedSchema != null ? expectedSchema : table.schema(); + return new Stats( + SparkSchemaUtil.estimateSize(SparkSchemaUtil.convert(projectedSchema), totalRecords), + totalRecords); + } } long sizeInBytes = 0L; diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone3.java new file mode 100644 index 000000000000..4216aec02789 --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone3.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +public class TestTimestampWithoutZone3 extends TestTimestampWithoutZone { + public TestTimestampWithoutZone3(String format, boolean vectorized) { + super(format, vectorized); + } +}