Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.amazonaws.services.s3.AmazonS3;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.Session;
import io.trino.plugin.hive.authentication.HiveIdentity;
Expand All @@ -42,6 +43,7 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static java.lang.String.format;
Expand All @@ -53,6 +55,7 @@ public abstract class BaseTestHiveOnDataLake
extends AbstractTestQueryFramework
{
private static final String HIVE_TEST_SCHEMA = "hive_insert_overwrite";
private static final DataSize HIVE_S3_STREAMING_PART_SIZE = DataSize.of(5, MEGABYTE);

private String bucketName;
private HiveMinioDataLake dockerizedS3DataLake;
Expand Down Expand Up @@ -99,7 +102,7 @@ protected QueryRunner createQueryRunner()
.put("hive.metastore-cache-ttl", "1d")
.put("hive.metastore-refresh-interval", "1d")
// This is required to reduce memory pressure to test writing large files
.put("hive.s3.streaming.part-size", "5MB")
.put("hive.s3.streaming.part-size", HIVE_S3_STREAMING_PART_SIZE.toString())
.buildOrThrow())
// Increased timeout due to occasional slower responses in such setup
.setMetastoreTimeout(new Duration(20, TimeUnit.SECONDS))
Expand Down Expand Up @@ -251,9 +254,8 @@ public void testFlushPartitionCache()
computeActual(format("DROP TABLE %s", fullyQualifiedTestTableName));
}

// TODO(https://github.com/trinodb/trino/issues/11659) reenable
@Test(enabled = false)
public void testWriteLargeFiles()
@Test
public void testWriteDifferentSizes()
{
String testTable = getTestTableName();
computeActual(format(
Expand All @@ -263,27 +265,19 @@ public void testWriteLargeFiles()
" regionkey bigint) " +
" WITH (partitioned_by=ARRAY['regionkey'])",
testTable));
String largeRowExpression = "array_join(transform(sequence(1, 70), x-> array_join(repeat(comment, 1000), '')), '')";
computeActual(format("INSERT INTO " + testTable + " SELECT %s, %s, regionkey FROM tpch.tiny.nation WHERE regionkey = 1", largeRowExpression, largeRowExpression));

long partSizeInBytes = HIVE_S3_STREAMING_PART_SIZE.toBytes();

// Exercise different code paths of Hive S3 streaming upload, with upload part size 5MB:
// 1. fileSize <= 5MB (direct upload)
testWriteWithFileSize(testTable, 50, 0, partSizeInBytes);

// 2. 5MB < fileSize <= 10MB (upload in two parts)
testWriteWithFileSize(testTable, 100, partSizeInBytes + 1, partSizeInBytes * 2);

// 3. fileSize > 10MB (upload in three or more parts)
query(format("SELECT DISTINCT (\"$file_size\" - 1) / (5 * 1024 * 1024) FROM %s ORDER BY 1", testTable))
.assertThat()
.skippingTypesCheck()
.containsAll(resultBuilder(getSession())
.row(0L)
.row(1L)
.row(2L)
.build());
query(format("SELECT SUM(LENGTH(col1)) FROM %s", testTable))
.assertThat()
.skippingTypesCheck()
.containsAll(resultBuilder(getSession())
.row(500L * 70 * 1000)
.build());
testWriteWithFileSize(testTable, 150, partSizeInBytes * 2 + 1, partSizeInBytes * 3);

computeActual(format("DROP TABLE %s", testTable));
}

Expand Down Expand Up @@ -402,4 +396,22 @@ protected void copyTpchNationToTable(String testTable)
{
computeActual(format("INSERT INTO " + testTable + " SELECT name, comment, nationkey, regionkey FROM tpch.tiny.nation"));
}

private void testWriteWithFileSize(String testTable, int scaleFactorInThousands, long fileSizeRangeStart, long fileSizeRangeEnd)
{
String scaledColumnExpression = format("array_join(transform(sequence(1, %d), x-> array_join(repeat(comment, 1000), '')), '')", scaleFactorInThousands);
computeActual(format("INSERT INTO " + testTable + " SELECT %s, %s, regionkey FROM tpch.tiny.nation WHERE nationkey = 9", scaledColumnExpression, scaledColumnExpression));
query(format("SELECT length(col1) FROM %s", testTable))
.assertThat()
.skippingTypesCheck()
.containsAll(resultBuilder(getSession())
.row(114L * scaleFactorInThousands * 1000)
.build());
query(format("SELECT \"$file_size\" BETWEEN %d AND %d FROM %s", fileSizeRangeStart, fileSizeRangeEnd, testTable))
.assertThat()
.skippingTypesCheck()
.containsAll(resultBuilder(getSession())
.row(true)
.build());
}
}