Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -42,6 +43,7 @@ public class TestHdfsFileSystemHdfs
extends AbstractTestTrinoFileSystem
{
private Hadoop hadoop;
private HdfsConfiguration hdfsConfiguration;
private HdfsEnvironment hdfsEnvironment;
private HdfsContext hdfsContext;
private TrinoFileSystem fileSystem;
Expand All @@ -53,7 +55,7 @@ void beforeAll()
hadoop.start();

HdfsConfig hdfsConfig = new HdfsConfig();
HdfsConfiguration hdfsConfiguration = new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), emptySet());
hdfsConfiguration = new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), emptySet());
hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication());
hdfsContext = new HdfsContext(ConnectorIdentity.ofUser("test"));

Expand Down Expand Up @@ -107,4 +109,33 @@ protected void verifyFileSystemIsEmpty()
throw new UncheckedIOException(e);
}
}

@Test
void testCreateDirectoryPermission()
throws IOException
{
assertCreateDirectoryPermission(fileSystem, hdfsEnvironment, (short) 777);
}

@Test
void testCreateDirectoryPermissionWithSkip()
throws IOException
{
HdfsConfig configWithSkip = new HdfsConfig()
.setNewDirectoryPermissions(HdfsConfig.SKIP_DIR_PERMISSIONS);
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, configWithSkip, new NoHdfsAuthentication());
TrinoFileSystem fileSystem = new HdfsFileSystem(hdfsEnvironment, hdfsContext, new TrinoHdfsFileSystemStats());

assertCreateDirectoryPermission(fileSystem, hdfsEnvironment, (short) 755);
}

private void assertCreateDirectoryPermission(TrinoFileSystem fileSystem, HdfsEnvironment hdfsEnvironment, short permission)
throws IOException
{
Location location = getRootLocation().appendPath("test");
fileSystem.createDirectory(location);
Path path = new Path(location.toString());
FileStatus status = hdfsEnvironment.getFileSystem(hdfsContext, path).getFileStatus(path);
assertThat(status.getPermission().toOctal()).isEqualTo(permission);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.base.projection.ApplyProjectionUtil;
Expand Down Expand Up @@ -127,7 +126,6 @@
import io.trino.spi.type.VarcharType;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.hadoop.fs.Path;

import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -1833,7 +1831,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
tableStatistics = new PartitionStatistics(createEmptyStatistics(), ImmutableMap.of());
}

Optional<Path> writePath = Optional.of(new Path(writeInfo.writePath().toString()));
Optional<Location> writePath = Optional.of(writeInfo.writePath());
if (handle.getPartitionedBy().isEmpty()) {
List<String> fileNames;
if (partitionUpdates.isEmpty()) {
Expand Down Expand Up @@ -2208,7 +2206,7 @@ private Table finishChangingTable(AcidOperation acidOperation, String changeDesc
session,
table,
principalPrivileges,
Optional.of(new Path(partitionUpdate.getWritePath().toString())),
Optional.of(partitionUpdate.getWritePath()),
Optional.of(partitionUpdate.getFileNames()),
false,
partitionStatistics,
Expand Down Expand Up @@ -2268,8 +2266,8 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode
if (handle.getLocationHandle().getWriteMode() == DIRECT_TO_TARGET_EXISTING_DIRECTORY) {
removeNonCurrentQueryFiles(session, partitionUpdate.getTargetPath());
if (handle.isRetriesEnabled()) {
HdfsContext hdfsContext = new HdfsContext(session);
cleanExtraOutputFiles(hdfsEnvironment, hdfsContext, session.getQueryId(), partitionUpdate.getTargetPath(), ImmutableSet.copyOf(partitionUpdate.getFileNames()));
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
cleanExtraOutputFiles(fileSystem, session.getQueryId(), partitionUpdate.getTargetPath(), ImmutableSet.copyOf(partitionUpdate.getFileNames()));
}
}
else {
Expand Down Expand Up @@ -2303,7 +2301,6 @@ private void removeNonCurrentQueryFiles(ConnectorSession session, Location parti
String queryId = session.getQueryId();
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
try {
// use TrinoFileSystem instead of Hadoop file system
FileIterator iterator = fileSystem.listFiles(partitionLocation);
while (iterator.hasNext()) {
Location location = iterator.next().location();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm

DirectoryLister directoryLister = transactionScopeCachingDirectoryListerFactory.get(this.directoryLister);
SemiTransactionalHiveMetastore metastore = new SemiTransactionalHiveMetastore(
hdfsEnvironment,
fileSystemFactory,
hiveMetastoreClosure,
fileSystemExecutor,
dropExecutor,
Expand Down
Loading