Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Blob;
import com.google.common.collect.Iterators;
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
Expand All @@ -37,8 +36,10 @@ public class GcsFileIterator
public GcsFileIterator(GcsLocation location, Page<Blob> page)
{
this.location = requireNonNull(location, "location is null");
// Page::iterateAll handles paging internally
this.blobIterator = Iterators.filter(page.iterateAll().iterator(), blob -> !blob.isDirectory());
this.blobIterator = page.streamAll()
.filter(blob -> !blob.isDirectory())
.filter(blob -> !blob.getName().endsWith("/"))
.iterator();
}

@Override
Expand All @@ -49,7 +50,7 @@ public boolean hasNext()
return blobIterator.hasNext();
}
catch (RuntimeException e) {
throw handleGcsException(e, "iterate files", location);
throw handleGcsException(e, "listing files", location);
}
}

Expand All @@ -63,7 +64,7 @@ public FileEntry next()
return new FileEntry(Location.of(location.getBase() + blob.getName()), length, Instant.from(blob.getUpdateTimeOffsetDateTime()), Optional.empty());
}
catch (RuntimeException e) {
throw handleGcsException(e, "iterate files", location);
throw handleGcsException(e, "listing files", location);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void deleteDirectory(Location location)
getFutureValue(Futures.allAsList(batchFutures));
}
catch (RuntimeException e) {
throw handleGcsException(e, "delete directory", gcsLocation);
throw handleGcsException(e, "deleting directory", gcsLocation);
}
}

Expand All @@ -161,7 +161,7 @@ public FileIterator listFiles(Location location)
return new GcsFileIterator(gcsLocation, getPage(gcsLocation));
}
catch (RuntimeException e) {
throw handleGcsException(e, "list files", gcsLocation);
throw handleGcsException(e, "listing files", gcsLocation);
}
}

Expand Down Expand Up @@ -258,7 +258,7 @@ public Set<Location> listDirectories(Location location)
return locationBuilder.build();
}
catch (RuntimeException e) {
throw handleGcsException(e, "list directories", gcsLocation);
throw handleGcsException(e, "listing directories", gcsLocation);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public int readTail(byte[] buffer, int bufferOffset, int bufferLength)
return readNBytes(readChannel, buffer, bufferOffset, bufferLength);
}
catch (RuntimeException e) {
throw handleGcsException(e, "read file", location);
throw handleGcsException(e, "reading file", location);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private void openStream()
this.readChannel = getReadChannel(blob, location, 0L, readBlockSizeBytes, predeclaredLength);
}
catch (RuntimeException e) {
throw handleGcsException(e, "read file", location);
throw handleGcsException(e, "reading file", location);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ private void writeDirect(ByteBuffer buffer)
int bytesWritten = 0;
try {
bytesWritten = writeChannel.write(buffer);
if (bytesWritten != buffer.remaining()) {
throw new IOException("Unexpected bytes written length: %s should be %s".formatted(bytesWritten, buffer.remaining()));
if (buffer.remaining() != 0) {
throw new IOException("Unexpected partial write (written=%s, remaining=%s)".formatted(bytesWritten, buffer.remaining()));
}
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.trino.filesystem.gcs;

import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.testing.RemoteStorageHelper;
Expand All @@ -22,11 +24,13 @@
import io.trino.spi.security.ConnectorIdentity;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import java.io.IOException;
import java.util.Base64;

import static com.google.cloud.storage.Storage.BlobTargetOption.doesNotExist;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -126,4 +130,26 @@ protected final boolean supportsRenameFile()
{
return false;
}

@Test
void testExistingFileWithTrailingSlash()
throws IOException
{
BlobId blobId = BlobId.of(new GcsLocation(rootLocation).bucket(), "data/file/");
storage.create(BlobInfo.newBuilder(blobId).build(), new byte[0], doesNotExist());
try {
assertThat(fileSystem.listFiles(getRootLocation()).hasNext()).isFalse();

Location data = getRootLocation().appendPath("data/");
assertThat(fileSystem.listDirectories(getRootLocation())).containsExactly(data);
assertThat(fileSystem.listDirectories(data)).containsExactly(data.appendPath("file/"));

// blobs ending in slash are deleted, even though they are not visible to listings
fileSystem.deleteDirectory(data);
assertThat(fileSystem.listDirectories(getRootLocation())).isEmpty();
}
finally {
storage.delete(blobId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ void testExistingFileWithTrailingSlash()
assertThat(fileSystem.listDirectories(getRootLocation())).containsExactly(data);
assertThat(fileSystem.listDirectories(data)).containsExactly(data.appendPath("file/"));

// blobs ending in slash are invisible to S3FileSystem and will not be deleted
fileSystem.deleteDirectory(data);
assertThat(fileSystem.listDirectories(getRootLocation())).containsExactly(data);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ void renameDirectory(Location source, Location target)

/**
* Lists all directories that are direct descendants of the specified directory.
* The location can be empty, which lists all directories at the root of the file system,
* otherwise the location otherwise the location must end with a slash.
* If the path is empty, all directories at the root of the file system are returned.
* Otherwise, the path must end with a slash.
* If the location does not exist, an empty set is returned.
* <p>
* For hierarchical file systems, if the path is not a directory, an exception is raised.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.nio.file.FileAlreadyExistsException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -559,6 +560,31 @@ void testOutputStreamByteAtATime()
}
}

@Test
void testOutputStreamLargeWrites()
throws IOException
{
try (TempBlob tempBlob = randomBlobLocation("inputStream")) {
try (OutputStream outputStream = tempBlob.outputFile().create()) {
for (int i = 0; i < 8; i++) {
byte[] bytes = new byte[MEGABYTE / 2];
Arrays.fill(bytes, (byte) i);
outputStream.write(bytes);
}
}

try (TrinoInputStream inputStream = tempBlob.inputFile().newStream()) {
for (int i = 0; i < 8; i++) {
byte[] expected = new byte[MEGABYTE / 2];
Arrays.fill(expected, (byte) i);
byte[] actual = inputStream.readNBytes(expected.length);
assertThat(actual.length).isEqualTo(expected.length);
assertThat(actual).isEqualTo(expected);
}
}
}
}

@Test
public void testPaths()
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public Hadoop()
protected void setupContainer()
{
super.setupContainer();
withLogConsumer(new PrintingLogConsumer("hadoop | "));
withLogConsumer(new PrintingLogConsumer("Hadoop"));
withRunCommand(List.of("bash", "-e", "-c", """
rm /etc/supervisord.d/{hive*,mysql*,socks*,sshd*,yarn*}.conf
supervisord -c /etc/supervisord.conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ protected void registerTableFromResources(String table, String resourcePath, Que
String targetDirectory = bucketName + "/" + table;

try {
List<ClassPath.ResourceInfo> resources = ClassPath.from(TestDeltaLakeAdlsConnectorSmokeTest.class.getClassLoader())
List<ClassPath.ResourceInfo> resources = ClassPath.from(getClass().getClassLoader())
.getResources()
.stream()
.filter(resourceInfo -> resourceInfo.getResourceName().startsWith(resourcePath + "/"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoOutputFile;
import io.trino.hdfs.gcs.GoogleGcsConfigurationInitializer;
import io.trino.hdfs.gcs.HiveGcsConfig;
import io.trino.plugin.hive.containers.HiveHadoop;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -47,7 +50,7 @@
import java.util.regex.Pattern;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY;
import static io.trino.plugin.deltalake.TestingDeltaLakeUtils.getConnectorService;
import static io.trino.plugin.hive.containers.HiveHadoop.HIVE3_IMAGE;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -143,7 +146,9 @@ protected HiveHadoop createHiveHadoop()
protected Map<String, String> hiveStorageConfiguration()
{
return ImmutableMap.<String, String>builder()
.put("hive.gcs.json-key", gcpCredentials)
.put("fs.hadoop.enabled", "false")
.put("fs.native-gcs.enabled", "true")
.put("gcs.json-key", gcpCredentials)
.buildOrThrow();
}

Expand All @@ -160,12 +165,13 @@ protected Map<String, String> deltaStorageConfiguration()
@Override
protected void registerTableFromResources(String table, String resourcePath, QueryRunner queryRunner)
{
this.fileSystem = HDFS_FILE_SYSTEM_FACTORY.create(queryRunner.getDefaultSession().toConnectorSession());
this.fileSystem = getConnectorService((DistributedQueryRunner) queryRunner, TrinoFileSystemFactory.class)
.create(ConnectorIdentity.ofUser("test"));

String targetDirectory = bucketUrl() + table;

try {
List<ClassPath.ResourceInfo> resources = ClassPath.from(TestDeltaLakeAdlsConnectorSmokeTest.class.getClassLoader())
List<ClassPath.ResourceInfo> resources = ClassPath.from(getClass().getClassLoader())
.getResources()
.stream()
.filter(resourceInfo -> resourceInfo.getResourceName().startsWith(resourcePath + "/"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import java.util.Optional;
import java.util.Set;

import static java.lang.String.format;

public class HiveHadoop
extends BaseTestContainer
{
Expand Down Expand Up @@ -73,7 +71,7 @@ protected void setupContainer()
ImmutableList.of(
"/bin/bash",
runCmd));
withLogConsumer(new PrintingLogConsumer(format("%-20s| ", "hadoop")));
withLogConsumer(new PrintingLogConsumer("Hadoop"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ protected QueryRunner createQueryRunner()
return IcebergQueryRunner.builder()
.setIcebergProperties(ImmutableMap.<String, String>builder()
.put("iceberg.catalog.type", "hive_metastore")
.put("hive.gcs.json-key", gcpCredentials)
.put("fs.hadoop.enabled", "false")
.put("fs.native-gcs.enabled", "true")
.put("gcs.json-key", gcpCredentials)
.put("hive.metastore.uri", "thrift://" + hiveHadoop.getHiveMetastoreEndpoint())
.put("iceberg.file-format", format.name())
.put("iceberg.register-table-procedure.enabled", "true")
Expand Down
16 changes: 9 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2671,13 +2671,15 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<properties>
<configurationParameters>junit.jupiter.execution.timeout.thread.mode.default = SEPARATE_THREAD
junit.jupiter.extensions.autodetection.enabled = true
junit.jupiter.execution.parallel.enabled = true
junit.jupiter.execution.parallel.mode.default = concurrent
junit.jupiter.execution.parallel.mode.classes.default = concurrent</configurationParameters>
</properties>
<!-- TODO: these should use https://junit.org/junit5/docs/current/user-guide/#running-tests-build-maven-config-params -->
<!-- use system properties to work around https://youtrack.jetbrains.com/issue/IDEA-339034 -->
<systemPropertyVariables>
<junit.jupiter.execution.timeout.thread.mode.default>SEPARATE_THREAD</junit.jupiter.execution.timeout.thread.mode.default>
<junit.jupiter.extensions.autodetection.enabled>true</junit.jupiter.extensions.autodetection.enabled>
<junit.jupiter.execution.parallel.enabled>true</junit.jupiter.execution.parallel.enabled>
<junit.jupiter.execution.parallel.mode.default>concurrent</junit.jupiter.execution.parallel.mode.default>
<junit.jupiter.execution.parallel.mode.classes.default>concurrent</junit.jupiter.execution.parallel.mode.classes.default>
</systemPropertyVariables>
<includes>
<!-- Tests classes should start with "Test", but we do also want to include tests incorrectly named, with "Test" at the end -->
<include>**/Test*.java</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void createBucket(String bucketName)
public void copyResources(String resourcePath, String bucketName, String target)
{
try (MinioClient minioClient = createMinioClient()) {
for (ClassPath.ResourceInfo resourceInfo : ClassPath.from(MinioClient.class.getClassLoader())
for (ClassPath.ResourceInfo resourceInfo : ClassPath.from(getClass().getClassLoader())
.getResources()) {
if (resourceInfo.getResourceName().startsWith(resourcePath)) {
String fileName = resourceInfo.getResourceName().replaceFirst("^" + Pattern.quote(resourcePath), quoteReplacement(target));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,16 @@
import org.testcontainers.containers.output.BaseConsumer;
import org.testcontainers.containers.output.OutputFrame;

import static java.util.Objects.requireNonNull;
import static org.testcontainers.containers.output.OutputFrame.OutputType.END;

public final class PrintingLogConsumer
extends BaseConsumer<PrintingLogConsumer>
{
private static final Logger log = Logger.get(PrintingLogConsumer.class);
private final Logger log;

private final String prefix;

public PrintingLogConsumer(String prefix)
public PrintingLogConsumer(String name)
{
this.prefix = requireNonNull(prefix, "prefix is null");
this.log = Logger.get("container." + name);
}

@Override
Expand All @@ -41,10 +38,10 @@ public void accept(OutputFrame outputFrame)
// remove new line characters
String message = outputFrame.getUtf8String().replaceAll("\\r?\\n?$", "");
if (!message.isEmpty() || outputFrame.getType() != END) {
log.info("%s%s", prefix, message);
log.info(message);
}
if (outputFrame.getType() == END) {
log.info("%s(exited)", prefix);
log.info("(exited)");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void copyResourcePath(String bucket, String resourcePath, String target)
ensureBucketExists(bucket);

try {
ClassPath.from(MinioClient.class.getClassLoader())
ClassPath.from(getClass().getClassLoader())
.getResources().stream()
.filter(resourceInfo -> resourceInfo.getResourceName().startsWith(resourcePath))
.forEach(resourceInfo -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ private static TestingTrinoServer createTestingTrinoServer(
.build();

String nodeRole = coordinator ? "coordinator" : "worker";
log.info("Created %s TestingTrinoServer in %s: %s", nodeRole, nanosSince(start).convertToMostSuccinctTimeUnit(), server.getBaseUrl());
log.info("Created TestingTrinoServer %s in %s: %s", nodeRole, nanosSince(start).convertToMostSuccinctTimeUnit(), server.getBaseUrl());

return server;
}
Expand Down