Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable;
import io.trino.plugin.deltalake.expression.SparkExpressionParser;
import io.trino.plugin.deltalake.metastore.DeltaLakeMetastore;
Expand Down Expand Up @@ -218,13 +216,9 @@
import static io.trino.plugin.hive.util.HiveClassNames.SEQUENCEFILE_INPUT_FORMAT_CLASS;
import static io.trino.plugin.hive.util.HiveUtil.isDeltaLakeTable;
import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
import static io.trino.plugin.hive.util.HiveWriteUtils.createDirectory;
import static io.trino.plugin.hive.util.HiveWriteUtils.isS3FileSystem;
import static io.trino.plugin.hive.util.HiveWriteUtils.pathExists;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY;
import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.connector.RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW;
Expand Down Expand Up @@ -298,7 +292,6 @@ public class DeltaLakeMetadata

private final DeltaLakeMetastore metastore;
private final TrinoFileSystemFactory fileSystemFactory;
private final HdfsEnvironment hdfsEnvironment;
private final TypeManager typeManager;
private final AccessControlMetadata accessControlMetadata;
private final TrinoViewHiveMetastore trinoViewHiveMetastore;
Expand All @@ -321,7 +314,6 @@ public class DeltaLakeMetadata
public DeltaLakeMetadata(
DeltaLakeMetastore metastore,
TrinoFileSystemFactory fileSystemFactory,
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
AccessControlMetadata accessControlMetadata,
TrinoViewHiveMetastore trinoViewHiveMetastore,
Expand All @@ -341,7 +333,6 @@ public DeltaLakeMetadata(
{
this.metastore = requireNonNull(metastore, "metastore is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.accessControlMetadata = requireNonNull(accessControlMetadata, "accessControlMetadata is null");
this.trinoViewHiveMetastore = requireNonNull(trinoViewHiveMetastore, "trinoViewHiveMetastore is null");
Expand Down Expand Up @@ -710,7 +701,6 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
external = false;
}
Path targetPath = new Path(location);
ensurePathExists(session, targetPath);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check can be skipped because directory is created implicitly when first file is created.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be part of the commit message

Path deltaLogDirectory = getTransactionLogDir(targetPath);
Optional<Long> checkpointInterval = getCheckpointInterval(tableMetadata.getProperties());
Optional<Boolean> changeDataFeedEnabled = getChangeDataFeedEnabled(tableMetadata.getProperties());
Expand Down Expand Up @@ -766,7 +756,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Unable to access file system for: " + location, e);
}

Table table = buildTable(session, schemaTableName, location, targetPath, external);
Table table = buildTable(session, schemaTableName, location, external);

PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(table.getOwner().orElseThrow());
metastore.createTable(
Expand All @@ -775,7 +765,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
principalPrivileges);
}

public static Table buildTable(ConnectorSession session, SchemaTableName schemaTableName, String location, Path targetPath, boolean isExternal)
public static Table buildTable(ConnectorSession session, SchemaTableName schemaTableName, String location, boolean isExternal)
{
Table.Builder tableBuilder = Table.builder()
.setDatabaseName(schemaTableName.getSchemaName())
Expand All @@ -785,7 +775,7 @@ public static Table buildTable(ConnectorSession session, SchemaTableName schemaT
.setDataColumns(DUMMY_DATA_COLUMNS)
.setParameters(deltaTableProperties(session, location, isExternal));

setDeltaStorageFormat(tableBuilder, location, targetPath);
setDeltaStorageFormat(tableBuilder, location);
return tableBuilder.build();
}

Expand All @@ -808,29 +798,13 @@ private static Map<String, String> deltaTableProperties(ConnectorSession session
return properties.buildOrThrow();
}

private static void setDeltaStorageFormat(Table.Builder tableBuilder, String location, Path targetPath)
private static void setDeltaStorageFormat(Table.Builder tableBuilder, String location)
{
tableBuilder.getStorageBuilder()
// this mimics what Databricks is doing when creating a Delta table in the Hive metastore
.setStorageFormat(DELTA_STORAGE_FORMAT)
.setSerdeParameters(ImmutableMap.of(PATH_PROPERTY, location))
.setLocation(targetPath.toString());
}

private Path getExternalPath(HdfsContext context, String location)
{
try {
Path path = new Path(location);
if (!isS3FileSystem(context, hdfsEnvironment, path)) {
if (!hdfsEnvironment.getFileSystem(context, path).getFileStatus(path).isDirectory()) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "External location must be a directory: " + location);
}
}
return path;
}
catch (IllegalArgumentException | IOException e) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "External location is not a valid file system URI: " + location, e);
}
.setLocation(location);
}

@Override
Expand Down Expand Up @@ -858,7 +832,6 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con
external = false;
}
Path targetPath = new Path(location);
ensurePathExists(session, targetPath);
checkPathContainsNoFiles(session, targetPath);

setRollback(() -> deleteRecursivelyIfExists(fileSystemFactory.create(session), targetPath));
Expand All @@ -884,14 +857,6 @@ private Optional<String> getSchemaLocation(Database database)
return schemaLocation;
}

private void ensurePathExists(ConnectorSession session, Path directoryPath)
{
HdfsContext hdfsContext = new HdfsContext(session);
if (!pathExists(hdfsContext, hdfsEnvironment, directoryPath)) {
createDirectory(hdfsContext, hdfsEnvironment, directoryPath);
}
}

private void checkPathContainsNoFiles(ConnectorSession session, Path targetPath)
{
try {
Expand Down Expand Up @@ -961,7 +926,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
.map(dataFileInfoCodec::fromJson)
.collect(toImmutableList());

Table table = buildTable(session, schemaTableName(schemaName, tableName), location, getExternalPath(new HdfsContext(session), location), handle.isExternal());
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be redundant check, as individual files (and implicitly directories) are created at this point. If location would be file it should fail during directory creation earlier.

Table table = buildTable(session, schemaTableName(schemaName, tableName), location, handle.isExternal());
// Ensure the table has queryId set. This is relied on for exception handling
String queryId = session.getQueryId();
verify(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import io.airlift.json.JsonCodec;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore;
import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
Expand All @@ -41,7 +40,6 @@ public class DeltaLakeMetadataFactory
{
private final HiveMetastoreFactory hiveMetastoreFactory;
private final TrinoFileSystemFactory fileSystemFactory;
private final HdfsEnvironment hdfsEnvironment;
private final TransactionLogAccess transactionLogAccess;
private final TypeManager typeManager;
private final DeltaLakeAccessControlMetadataFactory accessControlMetadataFactory;
Expand All @@ -66,7 +64,6 @@ public class DeltaLakeMetadataFactory
public DeltaLakeMetadataFactory(
HiveMetastoreFactory hiveMetastoreFactory,
TrinoFileSystemFactory fileSystemFactory,
HdfsEnvironment hdfsEnvironment,
TransactionLogAccess transactionLogAccess,
TypeManager typeManager,
DeltaLakeAccessControlMetadataFactory accessControlMetadataFactory,
Expand All @@ -83,7 +80,6 @@ public DeltaLakeMetadataFactory(
{
this.hiveMetastoreFactory = requireNonNull(hiveMetastoreFactory, "hiveMetastore is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.accessControlMetadataFactory = requireNonNull(accessControlMetadataFactory, "accessControlMetadataFactory is null");
Expand Down Expand Up @@ -125,7 +121,6 @@ public DeltaLakeMetadata create(ConnectorIdentity identity)
return new DeltaLakeMetadata(
deltaLakeMetastore,
fileSystemFactory,
hdfsEnvironment,
typeManager,
accessControlMetadata,
trinoViewHiveMetastore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void doRegisterTable(
throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, format("Failed checking table location %s", tableLocation), e);
}

Table table = buildTable(session, schemaTableName, tableLocation, new Path(tableLocation), true);
Table table = buildTable(session, schemaTableName, tableLocation, true);

PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(table.getOwner().orElseThrow());
metastore.createTable(
Expand Down