diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java index 30ced977118b..e2ed7b7940aa 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java @@ -16,6 +16,7 @@ import com.amazonaws.services.s3.AmazonS3; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.Session; import io.trino.plugin.hive.authentication.HiveIdentity; @@ -42,6 +43,7 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; +import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.trino.testing.MaterializedResult.resultBuilder; import static io.trino.testing.sql.TestTable.randomTableSuffix; import static java.lang.String.format; @@ -53,6 +55,7 @@ public abstract class BaseTestHiveOnDataLake extends AbstractTestQueryFramework { private static final String HIVE_TEST_SCHEMA = "hive_insert_overwrite"; + private static final DataSize HIVE_S3_STREAMING_PART_SIZE = DataSize.of(5, MEGABYTE); private String bucketName; private HiveMinioDataLake dockerizedS3DataLake; @@ -99,7 +102,7 @@ protected QueryRunner createQueryRunner() .put("hive.metastore-cache-ttl", "1d") .put("hive.metastore-refresh-interval", "1d") // This is required to reduce memory pressure to test writing large files - .put("hive.s3.streaming.part-size", "5MB") + .put("hive.s3.streaming.part-size", HIVE_S3_STREAMING_PART_SIZE.toString()) .buildOrThrow()) // Increased timeout due to occasional slower responses in such setup .setMetastoreTimeout(new Duration(20, TimeUnit.SECONDS)) @@ -251,9 +254,8 @@ public void testFlushPartitionCache() computeActual(format("DROP TABLE %s", fullyQualifiedTestTableName)); } - // TODO(https://github.com/trinodb/trino/issues/11659) reenable - @Test(enabled = false) - public void testWriteLargeFiles() + @Test + public void testWriteDifferentSizes() { String testTable = getTestTableName(); computeActual(format( @@ -263,27 +265,19 @@ public void testWriteLargeFiles() " regionkey bigint) " + " WITH (partitioned_by=ARRAY['regionkey'])", testTable)); - String largeRowExpression = "array_join(transform(sequence(1, 70), x-> array_join(repeat(comment, 1000), '')), '')"; - computeActual(format("INSERT INTO " + testTable + " SELECT %s, %s, regionkey FROM tpch.tiny.nation WHERE regionkey = 1", largeRowExpression, largeRowExpression)); + + long partSizeInBytes = HIVE_S3_STREAMING_PART_SIZE.toBytes(); // Exercise different code paths of Hive S3 streaming upload, with upload part size 5MB: // 1. fileSize <= 5MB (direct upload) + testWriteWithFileSize(testTable, 50, 0, partSizeInBytes); + // 2. 5MB < fileSize <= 10MB (upload in two parts) + testWriteWithFileSize(testTable, 100, partSizeInBytes + 1, partSizeInBytes * 2); + // 3. fileSize > 10MB (upload in three or more parts) - query(format("SELECT DISTINCT (\"$file_size\" - 1) / (5 * 1024 * 1024) FROM %s ORDER BY 1", testTable)) - .assertThat() - .skippingTypesCheck() - .containsAll(resultBuilder(getSession()) - .row(0L) - .row(1L) - .row(2L) - .build()); - query(format("SELECT SUM(LENGTH(col1)) FROM %s", testTable)) - .assertThat() - .skippingTypesCheck() - .containsAll(resultBuilder(getSession()) - .row(500L * 70 * 1000) - .build()); + testWriteWithFileSize(testTable, 150, partSizeInBytes * 2 + 1, partSizeInBytes * 3); + computeActual(format("DROP TABLE %s", testTable)); } @@ -402,4 +396,22 @@ protected void copyTpchNationToTable(String testTable) { computeActual(format("INSERT INTO " + testTable + " SELECT name, comment, nationkey, regionkey FROM tpch.tiny.nation")); } + + private void testWriteWithFileSize(String testTable, int scaleFactorInThousands, long fileSizeRangeStart, long fileSizeRangeEnd) + { + String scaledColumnExpression = format("array_join(transform(sequence(1, %d), x-> array_join(repeat(comment, 1000), '')), '')", scaleFactorInThousands); + computeActual(format("INSERT INTO " + testTable + " SELECT %s, %s, regionkey FROM tpch.tiny.nation WHERE nationkey = 9", scaledColumnExpression, scaledColumnExpression)); + query(format("SELECT length(col1) FROM %s", testTable)) + .assertThat() + .skippingTypesCheck() + .containsAll(resultBuilder(getSession()) + .row(114L * scaleFactorInThousands * 1000) + .build()); + query(format("SELECT \"$file_size\" BETWEEN %d AND %d FROM %s", fileSizeRangeStart, fileSizeRangeEnd, testTable)) + .assertThat() + .skippingTypesCheck() + .containsAll(resultBuilder(getSession()) + .row(true) + .build()); + } }