Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 4 additions & 24 deletions core/src/main/java/org/apache/iceberg/BaseTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@

package org.apache.iceberg;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
Expand All @@ -37,6 +33,8 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,8 +45,6 @@
abstract class BaseTableScan implements TableScan {
private static final Logger LOG = LoggerFactory.getLogger(BaseTableScan.class);

private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

private final TableOperations ops;
private final Table table;
private final Schema schema;
Expand Down Expand Up @@ -132,19 +128,7 @@ public TableScan asOfTime(long timestampMillis) {
Preconditions.checkArgument(context.snapshotId() == null,
"Cannot override snapshot, already set to id=%s", context.snapshotId());

Long lastSnapshotId = null;
for (HistoryEntry logEntry : ops.current().snapshotLog()) {
if (logEntry.timestampMillis() <= timestampMillis) {
lastSnapshotId = logEntry.snapshotId();
}
}

// the snapshot ID could be null if no entries were older than the requested time. in that case,
// there is no valid snapshot to read.
Preconditions.checkArgument(lastSnapshotId != null,
"Cannot find a snapshot older than %s", formatTimestampMillis(timestampMillis));

return useSnapshot(lastSnapshotId);
return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(), timestampMillis));
}

@Override
Expand Down Expand Up @@ -199,7 +183,7 @@ public CloseableIterable<FileScanTask> planFiles() {
Snapshot snapshot = snapshot();
if (snapshot != null) {
LOG.info("Scanning table {} snapshot {} created at {} with filter {}", table,
snapshot.snapshotId(), formatTimestampMillis(snapshot.timestampMillis()),
snapshot.snapshotId(), DateTimeUtil.formatTimestampMillis(snapshot.timestampMillis()),
context.rowFilter());

Listeners.notifyAll(
Expand Down Expand Up @@ -304,8 +288,4 @@ private Schema lazyColumnProjection() {

return schema;
}

private static String formatTimestampMillis(long millis) {
return DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneOffset.UTC));
}
}
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/DataTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;

public class DataTableScan extends BaseTableScan {
Expand Down Expand Up @@ -62,6 +63,15 @@ public TableScan appendsAfter(long fromSnapshotId) {
return appendsBetween(fromSnapshotId, currentSnapshot.snapshotId());
}

@Override
public TableScan useSnapshot(long scanSnapshotId) {
// call method in superclass just for the side effect of argument validation;
// we do not use its return value
super.useSnapshot(scanSnapshotId);
Schema snapshotSchema = SnapshotUtil.schemaFor(table(), scanSnapshotId);
return newRefinedScan(tableOps(), table(), snapshotSchema, context().useSnapshotId(scanSnapshotId));
}

@Override
protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
return new DataTableScan(ops, table, schema, context);
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;

public class DateTimeUtil {
private DateTimeUtil() {
}

private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

public static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
public static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
public static final long MICROS_PER_MILLIS = 1000L;
Expand Down Expand Up @@ -81,4 +84,8 @@ public static OffsetDateTime timestamptzFromMicros(long microsFromEpoch) {
public static long microsFromTimestamptz(OffsetDateTime dateTime) {
return ChronoUnit.MICROS.between(EPOCH, dateTime);
}

public static String formatTimestampMillis(long millis) {
return DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneOffset.UTC));
}
}
48 changes: 48 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.List;
import java.util.function.Function;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -144,4 +146,50 @@ public static Snapshot snapshotAfter(Table table, long snapshotId) {
throw new IllegalStateException(
String.format("Cannot find snapshot after %s: not an ancestor of table's current snapshot", snapshotId));
}

/**
* Returns the ID of the most recent snapshot for the table as of the timestamp.
*
* @param table a {@link Table}
* @param timestampMillis the timestamp in millis since the Unix epoch
* @return the snapshot ID
* @throws IllegalArgumentException when no snapshot is found in the table
* older than the timestamp
*/
public static long snapshotIdAsOfTime(Table table, long timestampMillis) {
Long snapshotId = null;
for (HistoryEntry logEntry : table.history()) {
if (logEntry.timestampMillis() <= timestampMillis) {
snapshotId = logEntry.snapshotId();
}
}

Preconditions.checkArgument(snapshotId != null,
"Cannot find a snapshot older than %s", DateTimeUtil.formatTimestampMillis(timestampMillis));
return snapshotId;
}

/**
* Returns the schema of the table for the specified snapshot.
*
* @param table a {@link Table}
* @param snapshotId the ID of the snapshot
* @return the schema
*/
public static Schema schemaFor(Table table, long snapshotId) {
Snapshot snapshot = table.snapshot(snapshotId);
Preconditions.checkArgument(snapshot != null, "Cannot find snapshot with ID %s", snapshotId);
Integer schemaId = snapshot.schemaId();

// schemaId could be null, if snapshot was created before Iceberg added schema id to snapshot
if (schemaId != null) {
Schema schema = table.schemas().get(schemaId);
Preconditions.checkState(schema != null,
"Cannot find schema with schema id %s", schemaId);
return schema;
}

// TODO: recover the schema by reading previous metadata files
return table.schema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public DataSourceReader createReader(StructType readSchema, DataSourceOptions op

Reader reader = new Reader(lazySparkSession(), table, Boolean.parseBoolean(caseSensitive), options);
if (readSchema != null) {
// convert() will fail if readSchema contains fields not in table.schema()
SparkSchemaUtil.convert(table.schema(), readSchema);
// convert() will fail if readSchema contains fields not in reader.snapshotSchema()
SparkSchemaUtil.convert(reader.snapshotSchema(), readSchema);
reader.pruneColumns(readSchema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
private final JavaSparkContext sparkContext;
private final Table table;
private final SparkReadConf readConf;
private final Long snapshotId;
private final Long startSnapshotId;
private final Long endSnapshotId;
private final Long asOfTimestamp;
private final Long splitSize;
private final Integer splitLookback;
private final Long splitOpenFileCost;
private final boolean caseSensitive;
private final TableScan baseScan;
private StructType requestedSchema = null;
private List<Expression> filterExpressions = null;
private Filter[] pushedFilters = NO_FILTERS;
Expand All @@ -111,31 +104,9 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
this.readConf = new SparkReadConf(spark, table, options.asMap());
this.snapshotId = readConf.snapshotId();
this.asOfTimestamp = readConf.asOfTimestamp();
if (snapshotId != null && asOfTimestamp != null) {
throw new IllegalArgumentException(
"Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
}

this.startSnapshotId = readConf.startSnapshotId();
this.endSnapshotId = readConf.endSnapshotId();
if (snapshotId != null || asOfTimestamp != null) {
if (startSnapshotId != null || endSnapshotId != null) {
throw new IllegalArgumentException(
"Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id or " +
"as-of-timestamp is specified");
}
} else {
if (startSnapshotId == null && endSnapshotId != null) {
throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan");
}
}

// look for split behavior overrides in options
this.splitSize = options.get(SparkReadOptions.SPLIT_SIZE).map(Long::parseLong).orElse(null);
this.splitLookback = options.get(SparkReadOptions.LOOKBACK).map(Integer::parseInt).orElse(null);
this.splitOpenFileCost = options.get(SparkReadOptions.FILE_OPEN_COST).map(Long::parseLong).orElse(null);
this.baseScan = configureBaseScan(caseSensitive, options);
this.schema = baseScan.schema();

if (table.io() instanceof HadoopFileIO) {
String fsscheme = "no_exist";
Expand All @@ -157,18 +128,84 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
this.localityPreferred = false;
}

this.schema = table.schema();
this.caseSensitive = caseSensitive;
this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone();
}

private void validateOptions(
Long snapshotId, Long asOfTimestamp, Long startSnapshotId, Long endSnapshotId) {
if (snapshotId != null && asOfTimestamp != null) {
throw new IllegalArgumentException(
"Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
}

if ((snapshotId != null || asOfTimestamp != null) &&
(startSnapshotId != null || endSnapshotId != null)) {
throw new IllegalArgumentException(
"Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id or " +
"as-of-timestamp is specified");
}

if (startSnapshotId == null && endSnapshotId != null) {
throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan");
}
}

private TableScan configureBaseScan(boolean caseSensitive, DataSourceOptions options) {
Long snapshotId = readConf.snapshotId();
Long asOfTimestamp = readConf.asOfTimestamp();
Long startSnapshotId = readConf.startSnapshotId();
Long endSnapshotId = readConf.endSnapshotId();
validateOptions(snapshotId, asOfTimestamp, startSnapshotId, endSnapshotId);

TableScan scan = table.newScan().caseSensitive(caseSensitive);

if (snapshotId != null) {
scan = scan.useSnapshot(snapshotId);
}

if (asOfTimestamp != null) {
scan = scan.asOfTime(asOfTimestamp);
}

if (startSnapshotId != null) {
if (endSnapshotId != null) {
scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
} else {
scan = scan.appendsAfter(startSnapshotId);
}
}

// look for split behavior overrides in options
Long splitSize = options.get(SparkReadOptions.SPLIT_SIZE).map(Long::parseLong).orElse(null);
if (splitSize != null) {
scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString());
}

Integer splitLookback = options.get(SparkReadOptions.LOOKBACK).map(Integer::parseInt).orElse(null);
if (splitLookback != null) {
scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString());
}

Long splitOpenFileCost = options.get(SparkReadOptions.FILE_OPEN_COST).map(Long::parseLong).orElse(null);
if (splitOpenFileCost != null) {
scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString());
}

return scan;
}

protected Schema snapshotSchema() {
return baseScan.schema();
}

private Schema lazySchema() {
if (schema == null) {
if (requestedSchema != null) {
// the projection should include all columns that will be returned, including those only used in filters
this.schema = SparkSchemaUtil.prune(table.schema(), requestedSchema, filterExpression(), caseSensitive);
this.schema = SparkSchemaUtil.prune(
baseScan.schema(), requestedSchema, filterExpression(), baseScan.isCaseSensitive());
} else {
this.schema = table.schema();
this.schema = baseScan.schema();
}
}
return schema;
Expand Down Expand Up @@ -211,6 +248,7 @@ public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));

List<CombinedScanTask> scanTasks = tasks();
boolean caseSensitive = baseScan.isCaseSensitive();
InputPartition<ColumnarBatch>[] readTasks = new InputPartition[scanTasks.size()];

Tasks.range(readTasks.length)
Expand All @@ -235,6 +273,7 @@ public List<InputPartition<InternalRow>> planInputPartitions() {
Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));

List<CombinedScanTask> scanTasks = tasks();
boolean caseSensitive = baseScan.isCaseSensitive();
InputPartition<InternalRow>[] readTasks = new InputPartition[scanTasks.size()];

Tasks.range(readTasks.length)
Expand Down Expand Up @@ -378,38 +417,7 @@ private static void mergeIcebergHadoopConfs(

private List<CombinedScanTask> tasks() {
if (tasks == null) {
TableScan scan = table
.newScan()
.caseSensitive(caseSensitive)
.project(lazySchema());

if (snapshotId != null) {
scan = scan.useSnapshot(snapshotId);
}

if (asOfTimestamp != null) {
scan = scan.asOfTime(asOfTimestamp);
}

if (startSnapshotId != null) {
if (endSnapshotId != null) {
scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
} else {
scan = scan.appendsAfter(startSnapshotId);
}
}

if (splitSize != null) {
scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString());
}

if (splitLookback != null) {
scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString());
}

if (splitOpenFileCost != null) {
scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString());
}
TableScan scan = baseScan.project(lazySchema());

if (filterExpressions != null) {
for (Expression filter : filterExpressions) {
Expand All @@ -430,8 +438,8 @@ private List<CombinedScanTask> tasks() {
@Override
public String toString() {
return String.format(
"IcebergScan(table=%s, type=%s, filters=%s, caseSensitive=%s, batchedReads=%s)",
table, lazySchema().asStruct(), filterExpressions, caseSensitive, enableBatchRead());
"IcebergScan(table=%s, type=%s, filters=%s, batchedReads=%s)",
table, lazySchema().asStruct(), filterExpressions, enableBatchRead());
}

private static class ReadTask<T> implements Serializable, InputPartition<T> {
Expand Down
Loading