diff --git a/presto-delta/pom.xml b/presto-delta/pom.xml
index da1965e03c041..f9202b0c86dcb 100644
--- a/presto-delta/pom.xml
+++ b/presto-delta/pom.xml
@@ -14,33 +14,61 @@
${project.parent.basedir}
true
+ 3.2.0
io.delta
- delta-standalone_2.12
- 0.6.0
+ delta-kernel-api
+ ${io.delta.delta-kernel-api.version}
- org.scala-lang
- scala-library
+ org.roaringbitmap
+ RoaringBitmap
- com.fasterxml.jackson.module
- jackson-module-scala_2.12
+ org.slf4j
+ slf4j-api
+
+
+
+ io.delta
+ delta-kernel-defaults
+ ${io.delta.delta-kernel-api.version}
+
com.fasterxml.jackson.core
- jackson-core
+ jackson-databind
- com.fasterxml.jackson.core
- jackson-annotations
+ org.apache.hadoop
+ hadoop-client-api
- com.fasterxml.jackson.core
- jackson-databind
+ org.roringbitmap
+ RoaringBitmap
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.apache.parquet
+ parquet-hadoop
+
+
+ commons-logging
+ commons-logging
+
+
+ javax.xml.bind
+ jaxb-api
+
+
+ org.apache.hadoop
+ hadoop-client-runtime
diff --git a/presto-delta/src/main/java/com/facebook/presto/delta/DeltaClient.java b/presto-delta/src/main/java/com/facebook/presto/delta/DeltaClient.java
index 1dce78dda62af..bf8ff349c0d3e 100644
--- a/presto-delta/src/main/java/com/facebook/presto/delta/DeltaClient.java
+++ b/presto-delta/src/main/java/com/facebook/presto/delta/DeltaClient.java
@@ -19,28 +19,32 @@
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
-import io.delta.standalone.DeltaLog;
-import io.delta.standalone.Snapshot;
-import io.delta.standalone.actions.AddFile;
-import io.delta.standalone.actions.Metadata;
-import io.delta.standalone.data.CloseableIterator;
+import com.facebook.presto.spi.StandardErrorCode;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.Table;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.TableNotFoundException;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.internal.SnapshotImpl;
+import io.delta.kernel.utils.CloseableIterator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import javax.inject.Inject;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.time.Instant;
-import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.stream.Collectors;
-import static com.facebook.presto.delta.DeltaErrorCode.DELTA_UNSUPPORTED_DATA_FORMAT;
import static com.facebook.presto.delta.DeltaTable.DataFormat.PARQUET;
-import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
-import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
import static java.util.Locale.US;
@@ -51,6 +55,7 @@
*/
public class DeltaClient
{
+ private static final String TABLE_NOT_FOUND_ERROR_TEMPLATE = "Delta table (%s.%s) no longer exists.";
private final HdfsEnvironment hdfsEnvironment;
@Inject
@@ -70,45 +75,68 @@ public DeltaClient(HdfsEnvironment hdfsEnvironment)
* @return If the table is found return {@link DeltaTable}.
*/
public Optional getTable(
+ DeltaConfig config,
ConnectorSession session,
SchemaTableName schemaTableName,
String tableLocation,
Optional snapshotId,
Optional snapshotAsOfTimestampMillis)
{
- Optional deltaLog = loadDeltaTableLog(session, new Path(tableLocation), schemaTableName);
- if (!deltaLog.isPresent()) {
+ Path location = new Path(tableLocation);
+ Optional deltaEngine = loadDeltaEngine(session, location, schemaTableName);
+ if (!deltaEngine.isPresent()) {
return Optional.empty();
}
+ Table deltaTable = loadDeltaTable(location.toString(), deltaEngine.get());
+ Snapshot snapshot = getSnapshot(deltaTable, deltaEngine.get(), schemaTableName, snapshotId,
+ snapshotAsOfTimestampMillis);
+ return Optional.of(new DeltaTable(
+ schemaTableName.getSchemaName(),
+ schemaTableName.getTableName(),
+ tableLocation,
+ Optional.of(snapshot.getVersion(deltaEngine.get())), // lock the snapshot version
+ getSchema(config, schemaTableName, deltaEngine.get(), snapshot)));
+ }
+
+ private Snapshot getSnapshot(
+ Table deltaTable,
+ Engine deltaEngine,
+ SchemaTableName schemaTableName,
+ Optional snapshotId,
+ Optional snapshotAsOfTimestampMillis)
+ {
// Fetch the snapshot info for given snapshot version. If no snapshot version is given, get the latest snapshot info.
// Lock the snapshot version here and use it later in the rest of the query (such as fetching file list etc.).
// If we don't lock the snapshot version here, the query may end up with schema from one version and data files from another
// version when the underlying delta table is changing while the query is running.
Snapshot snapshot;
if (snapshotId.isPresent()) {
- snapshot = getSnapshotById(deltaLog.get(), snapshotId.get(), schemaTableName);
+ snapshot = getSnapshotById(deltaTable, deltaEngine, snapshotId.get(), schemaTableName);
}
else if (snapshotAsOfTimestampMillis.isPresent()) {
- snapshot = getSnapshotAsOfTimestamp(deltaLog.get(), snapshotAsOfTimestampMillis.get(), schemaTableName);
+ snapshot = getSnapshotAsOfTimestamp(deltaTable, deltaEngine,
+ snapshotAsOfTimestampMillis.get(), schemaTableName);
}
else {
- snapshot = deltaLog.get().snapshot(); // get the latest snapshot
+ try {
+ snapshot = deltaTable.getLatestSnapshot(deltaEngine); // get the latest snapshot
+ }
+ catch (TableNotFoundException e) {
+ throw new PrestoException(StandardErrorCode.NOT_FOUND,
+ format("Could not move to latest snapshot on table '%s.%s'", schemaTableName.getSchemaName(),
+ schemaTableName.getTableName()), e);
+ }
}
- Metadata metadata = snapshot.getMetadata();
- String format = metadata.getFormat().getProvider();
- if (!PARQUET.name().equalsIgnoreCase(format)) {
- throw new PrestoException(DELTA_UNSUPPORTED_DATA_FORMAT,
- format("Delta table %s has unsupported data format: %s. Currently only Parquet data format is supported", schemaTableName, format));
+ if (snapshot instanceof SnapshotImpl) {
+ String format = ((SnapshotImpl) snapshot).getMetadata().getFormat().getProvider();
+ if (!PARQUET.name().equalsIgnoreCase(format)) {
+ throw new PrestoException(DeltaErrorCode.DELTA_UNSUPPORTED_DATA_FORMAT,
+ format("Delta table %s has unsupported data format: %s. Only the Parquet data format is supported", schemaTableName, format));
+ }
}
-
- return Optional.of(new DeltaTable(
- schemaTableName.getSchemaName(),
- schemaTableName.getTableName(),
- tableLocation,
- Optional.of(snapshot.getVersion()), // lock the snapshot version
- getSchema(schemaTableName, metadata)));
+ return snapshot;
}
/**
@@ -116,26 +144,36 @@ else if (snapshotAsOfTimestampMillis.isPresent()) {
*
* @return Closeable iterator of files. It is responsibility of the caller to close the iterator.
*/
- public CloseableIterator listFiles(ConnectorSession session, DeltaTable deltaTable)
+ public CloseableIterator listFiles(ConnectorSession session, DeltaTable deltaTable)
{
+ requireNonNull(deltaTable, "deltaTable is null");
checkArgument(deltaTable.getSnapshotId().isPresent(), "Snapshot id is missing from the Delta table");
- Optional deltaLog = loadDeltaTableLog(
- session,
+ Optional deltaEngine = loadDeltaEngine(session,
new Path(deltaTable.getTableLocation()),
new SchemaTableName(deltaTable.getSchemaName(), deltaTable.getTableName()));
+ if (!deltaEngine.isPresent()) {
+ throw new PrestoException(DeltaErrorCode.DELTA_ERROR_LOADING_METADATA,
+ format("Could not obtain Delta engine in '%s'", deltaTable.getTableLocation()));
+ }
+ Table sourceTable = loadDeltaTable(deltaTable.getTableLocation(), deltaEngine.get());
- if (!deltaLog.isPresent()) {
- throw new PrestoException(NOT_FOUND,
- format("Delta table (%s.%s) no longer exists.", deltaTable.getSchemaName(), deltaTable.getTableName()));
+ if (!deltaTable.getSnapshotId().isPresent()) {
+ throw new PrestoException(DeltaErrorCode.DELTA_ERROR_LOADING_SNAPSHOT, "Could not obtain snapshot id");
}
- return deltaLog.get()
- .getSnapshotForVersionAsOf(deltaTable.getSnapshotId().get())
- .scan()
- .getFiles();
+ try {
+ return sourceTable.getSnapshotAsOfVersion(deltaEngine.get(),
+ deltaTable.getSnapshotId().get()).getScanBuilder(deltaEngine.get()).build()
+ .getScanFiles(deltaEngine.get());
+ }
+ catch (TableNotFoundException e) {
+ throw new PrestoException(StandardErrorCode.NOT_FOUND,
+ format("Delta table not found in '%s'", deltaTable.getTableLocation()), e);
+ }
}
- private Optional loadDeltaTableLog(ConnectorSession session, Path tableLocation, SchemaTableName schemaTableName)
+ private Optional loadDeltaEngine(ConnectorSession session, Path tableLocation,
+ SchemaTableName schemaTableName)
{
try {
HdfsContext hdfsContext = new HdfsContext(
@@ -148,63 +186,97 @@ private Optional loadDeltaTableLog(ConnectorSession session, Path tabl
if (!fileSystem.isDirectory(tableLocation)) {
return Optional.empty();
}
- return Optional.of(DeltaLog.forTable(
- hdfsEnvironment.getConfiguration(hdfsContext, tableLocation),
- tableLocation));
+ return Optional.of(DefaultEngine.create(fileSystem.getConf()));
}
catch (IOException ioException) {
- throw new PrestoException(GENERIC_INTERNAL_ERROR, "Failed to load Delta table: " + ioException.getMessage(), ioException);
+ throw new PrestoException(DeltaErrorCode.DELTA_ERROR_LOADING_METADATA,
+ "Failed to load Delta table: " + ioException.getMessage(), ioException);
}
}
- private static Snapshot getSnapshotById(DeltaLog deltaLog, long snapshotId, SchemaTableName schemaTableName)
+ private Table loadDeltaTable(String tableLocation, Engine deltaEngine)
+ {
+ return Table.forPath(deltaEngine, tableLocation);
+ }
+
+ private static Snapshot getSnapshotById(Table deltaTable, Engine deltaEngine, long snapshotId, SchemaTableName schemaTableName)
{
try {
- return deltaLog.getSnapshotForVersionAsOf(snapshotId);
+ return deltaTable.getSnapshotAsOfVersion(deltaEngine, snapshotId);
}
catch (IllegalArgumentException exception) {
throw new PrestoException(
- NOT_FOUND,
+ StandardErrorCode.NOT_FOUND,
format("Snapshot version %d does not exist in Delta table '%s'.", snapshotId, schemaTableName),
exception);
}
+ catch (TableNotFoundException e) {
+ throw new PrestoException(StandardErrorCode.NOT_FOUND,
+ format(TABLE_NOT_FOUND_ERROR_TEMPLATE, schemaTableName.getSchemaName(),
+ schemaTableName.getTableName()));
+ }
}
- private static Snapshot getSnapshotAsOfTimestamp(DeltaLog deltaLog, long snapshotAsOfTimestampMillis, SchemaTableName schemaTableName)
+ private static Snapshot getSnapshotAsOfTimestamp(Table deltaTable, Engine deltaEngine,
+ long snapshotAsOfTimestampMillis, SchemaTableName schemaTableName)
{
try {
- return deltaLog.getSnapshotForTimestampAsOf(snapshotAsOfTimestampMillis);
+ return deltaTable.getSnapshotAsOfTimestamp(deltaEngine, snapshotAsOfTimestampMillis);
}
catch (IllegalArgumentException exception) {
throw new PrestoException(
- NOT_FOUND,
+ StandardErrorCode.NOT_FOUND,
format(
"There is no snapshot exists in Delta table '%s' that is created on or before '%s'",
schemaTableName,
Instant.ofEpochMilli(snapshotAsOfTimestampMillis)),
exception);
}
+ catch (TableNotFoundException e) {
+ throw new PrestoException(StandardErrorCode.NOT_FOUND,
+ format(TABLE_NOT_FOUND_ERROR_TEMPLATE, schemaTableName.getSchemaName(),
+ schemaTableName.getTableName()));
+ }
}
/**
* Utility method that returns the columns in given Delta metadata. Returned columns include regular and partition types.
* Data type from Delta is mapped to appropriate Presto data type.
*/
- private static List getSchema(SchemaTableName tableName, Metadata metadata)
+ private static List getSchema(DeltaConfig config, SchemaTableName tableName, Engine deltaEngine,
+ Snapshot snapshot)
{
- Set partitionColumns = metadata.getPartitionColumns().stream()
- .map(String::toLowerCase)
- .collect(Collectors.toSet());
-
- return Arrays.stream(metadata.getSchema().getFields())
- .map(field -> {
- String columnName = field.getName().toLowerCase(US);
- TypeSignature prestoType = DeltaTypeUtils.convertDeltaDataTypePrestoDataType(tableName, columnName, field.getDataType());
- return new DeltaColumn(
- columnName,
- prestoType,
- field.isNullable(),
- partitionColumns.contains(columnName));
- }).collect(Collectors.toList());
+ try (CloseableIterator columnBatches = snapshot.getScanBuilder(deltaEngine).build()
+ .getScanFiles(deltaEngine)) {
+ Row row = null;
+ while (columnBatches.hasNext()) {
+ CloseableIterator rows = columnBatches.next().getRows();
+ if (rows.hasNext()) {
+ row = rows.next();
+ break;
+ }
+ }
+ Map partitionValues = row != null ?
+ InternalScanFileUtils.getPartitionValues(row) : new HashMap<>(0);
+ return snapshot.getSchema(deltaEngine).fields().stream()
+ .map(field -> {
+ String columnName = config.isCaseSensitivePartitionsEnabled() ? field.getName() :
+ field.getName().toLowerCase(US);
+ TypeSignature prestoType = DeltaTypeUtils.convertDeltaDataTypePrestoDataType(tableName,
+ columnName, field.getDataType());
+ return new DeltaColumn(
+ columnName,
+ prestoType,
+ field.isNullable(),
+ partitionValues.containsKey(columnName));
+ }).collect(Collectors.toList());
+ }
+ catch (TableNotFoundException e) {
+ throw new PrestoException(StandardErrorCode.NOT_FOUND,
+ format(TABLE_NOT_FOUND_ERROR_TEMPLATE, tableName.getSchemaName(), tableName.getTableName()));
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException("Could not close columnar batch row", e);
+ }
}
}
diff --git a/presto-delta/src/main/java/com/facebook/presto/delta/DeltaConfig.java b/presto-delta/src/main/java/com/facebook/presto/delta/DeltaConfig.java
index eae39e34619e3..2a87649c69ad5 100644
--- a/presto-delta/src/main/java/com/facebook/presto/delta/DeltaConfig.java
+++ b/presto-delta/src/main/java/com/facebook/presto/delta/DeltaConfig.java
@@ -21,6 +21,7 @@ public class DeltaConfig
{
private int maxSplitsBatchSize = 200;
private boolean parquetDereferencePushdownEnabled = true;
+ private boolean caseSensitivePartitionsEnabled = true;
@NotNull
public boolean isParquetDereferencePushdownEnabled()
@@ -46,4 +47,16 @@ public DeltaConfig setMaxSplitsBatchSize(int maxSplitsBatchSize)
this.maxSplitsBatchSize = maxSplitsBatchSize;
return this;
}
+
+ public boolean isCaseSensitivePartitionsEnabled()
+ {
+ return this.caseSensitivePartitionsEnabled;
+ }
+
+ @Config("delta.case-sensitive-partitions-enabled")
+ public DeltaConfig setCaseSensitivePartitionsEnabled(boolean caseSensitivePartitionsEnabled)
+ {
+ this.caseSensitivePartitionsEnabled = caseSensitivePartitionsEnabled;
+ return this;
+ }
}
diff --git a/presto-delta/src/main/java/com/facebook/presto/delta/DeltaErrorCode.java b/presto-delta/src/main/java/com/facebook/presto/delta/DeltaErrorCode.java
index ab759e2ef975f..fb1e9436ff62d 100644
--- a/presto-delta/src/main/java/com/facebook/presto/delta/DeltaErrorCode.java
+++ b/presto-delta/src/main/java/com/facebook/presto/delta/DeltaErrorCode.java
@@ -31,7 +31,9 @@ public enum DeltaErrorCode
DELTA_CANNOT_OPEN_SPLIT(4, EXTERNAL),
DELTA_MISSING_DATA(5, EXTERNAL),
DELTA_READ_DATA_ERROR(6, INTERNAL_ERROR),
- DELTA_INVALID_PARTITION_VALUE(7, EXTERNAL);
+ DELTA_INVALID_PARTITION_VALUE(7, EXTERNAL),
+ DELTA_ERROR_LOADING_METADATA(8, EXTERNAL),
+ DELTA_ERROR_LOADING_SNAPSHOT(9, EXTERNAL);
private final ErrorCode errorCode;
diff --git a/presto-delta/src/main/java/com/facebook/presto/delta/DeltaExpressionUtils.java b/presto-delta/src/main/java/com/facebook/presto/delta/DeltaExpressionUtils.java
index 59f47e7d7d612..8ea44c160a591 100644
--- a/presto-delta/src/main/java/com/facebook/presto/delta/DeltaExpressionUtils.java
+++ b/presto-delta/src/main/java/com/facebook/presto/delta/DeltaExpressionUtils.java
@@ -13,6 +13,8 @@
*/
package com.facebook.presto.delta;
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.common.GenericInternalException;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.predicate.ValueSet;
@@ -24,8 +26,10 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
-import io.delta.standalone.actions.AddFile;
-import io.delta.standalone.data.CloseableIterator;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.utils.CloseableIterator;
import java.io.IOException;
import java.sql.Date;
@@ -34,12 +38,12 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
-import java.util.stream.Collectors;
import static com.facebook.presto.delta.DeltaColumnHandle.ColumnType.PARTITION;
import static com.facebook.presto.delta.DeltaErrorCode.DELTA_INVALID_PARTITION_VALUE;
import static com.facebook.presto.delta.DeltaErrorCode.DELTA_UNSUPPORTED_COLUMN_TYPE;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.Slices.utf8Slice;
import static java.lang.Double.doubleToRawLongBits;
import static java.lang.Double.parseDouble;
@@ -50,6 +54,7 @@
public final class DeltaExpressionUtils
{
+ private static final Logger logger = Logger.get(DeltaExpressionUtils.class);
private DeltaExpressionUtils()
{
}
@@ -64,18 +69,15 @@ public static List> splitPredicate(
ImmutableMap.Builder regularColumnPredicates = ImmutableMap.builder();
Optional