diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index 6a7f6e3989ed..fb9098e1eccf 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -4036,27 +4036,19 @@ public void testBucketedExecution() } @Test - public void testScaleWriters() - { - testWithAllStorageFormats(this::testSingleWriter); - testWithAllStorageFormats(this::testMultipleWriters); - testWithAllStorageFormats(this::testMultipleWritersWithSkewedData); - } - - protected void testSingleWriter(Session session, HiveStorageFormat storageFormat) + public void testSingleWriter() { try { - // small table that will only have one writer - @Language("SQL") String createTableSql = format("" + - "CREATE TABLE scale_writers_small WITH (format = '%s') AS " + - "SELECT * FROM tpch.tiny.orders", - storageFormat); + // Small table that will only have one writer + @Language("SQL") String createTableSql = "" + + "CREATE TABLE scale_writers_small WITH (format = 'PARQUET') AS " + + "SELECT * FROM tpch.tiny.orders"; assertUpdate( - Session.builder(session) + Session.builder(getSession()) .setSystemProperty("task_writer_count", "1") .setSystemProperty("scale_writers", "true") .setSystemProperty("task_scale_writers_enabled", "false") - .setSystemProperty("writer_scaling_min_data_processed", "32MB") + .setSystemProperty("writer_scaling_min_data_processed", "100MB") .build(), createTableSql, (long) computeActual("SELECT count(*) FROM tpch.tiny.orders").getOnlyValue()); @@ -4068,24 +4060,23 @@ protected void testSingleWriter(Session session, HiveStorageFormat storageFormat } } - private void testMultipleWriters(Session session, HiveStorageFormat storageFormat) + @Test + public void testMultipleWriters() { try { - // large table that will scale writers to multiple machines - @Language("SQL") String createTableSql = format("" + - "CREATE TABLE scale_writers_large WITH (format = '%s') AS " + - "SELECT * FROM tpch.sf1.orders", - storageFormat); + // We need to use large table (sf2) to see the effect. Otherwise, a single writer will write the entire + // data before ScaledWriterScheduler is able to scale it to multiple machines. + @Language("SQL") String createTableSql = "CREATE TABLE scale_writers_large WITH (format = 'PARQUET') AS " + + "SELECT * FROM tpch.sf2.orders"; assertUpdate( - Session.builder(session) + Session.builder(getSession()) .setSystemProperty("task_writer_count", "1") .setSystemProperty("scale_writers", "true") .setSystemProperty("task_scale_writers_enabled", "false") .setSystemProperty("writer_scaling_min_data_processed", "1MB") - .setCatalogSessionProperty(catalog, "parquet_writer_block_size", "4MB") .build(), createTableSql, - (long) computeActual("SELECT count(*) FROM tpch.sf1.orders").getOnlyValue()); + (long) computeActual("SELECT count(*) FROM tpch.sf2.orders").getOnlyValue()); long files = (long) computeScalar("SELECT count(DISTINCT \"$path\") FROM scale_writers_large"); long workers = (long) computeScalar("SELECT count(*) FROM system.runtime.nodes"); @@ -4096,24 +4087,24 @@ private void testMultipleWriters(Session session, HiveStorageFormat storageForma } } - private void testMultipleWritersWithSkewedData(Session session, HiveStorageFormat storageFormat) + @Test + public void testMultipleWritersWithSkewedData() { try { - // skewed table that will scale writers to multiple machines - String selectSql = "SELECT t1.* FROM (SELECT *, case when orderkey >= 0 then 1 else orderkey end as join_key FROM tpch.sf1.orders) t1 " + - "INNER JOIN (SELECT orderkey FROM tpch.sf1.orders) t2 " + + // We need to use large table (sf2) to see the effect. Otherwise, a single writer will write the entire + // data before ScaledWriterScheduler is able to scale it to multiple machines. + // Skewed table that will scale writers to multiple machines. + String selectSql = "SELECT t1.* FROM (SELECT *, case when orderkey >= 0 then 1 else orderkey end as join_key FROM tpch.sf2.orders) t1 " + + "INNER JOIN (SELECT orderkey FROM tpch.sf2.orders) t2 " + "ON t1.join_key = t2.orderkey"; - @Language("SQL") String createTableSql = format("" + - "CREATE TABLE scale_writers_skewed WITH (format = '%s') AS " + selectSql, - storageFormat); + @Language("SQL") String createTableSql = "CREATE TABLE scale_writers_skewed WITH (format = 'PARQUET') AS " + selectSql; assertUpdate( - Session.builder(session) + Session.builder(getSession()) .setSystemProperty("task_writer_count", "1") .setSystemProperty("scale_writers", "true") .setSystemProperty("task_scale_writers_enabled", "false") .setSystemProperty("writer_scaling_min_data_processed", "1MB") .setSystemProperty("join_distribution_type", "PARTITIONED") - .setCatalogSessionProperty(catalog, "parquet_writer_block_size", "4MB") .build(), createTableSql, (long) computeActual("SELECT count(*) FROM (" + selectSql + ")") diff --git a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/hive/TestHiveFaultTolerantExecutionConnectorTest.java b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/hive/TestHiveFaultTolerantExecutionConnectorTest.java index fac9de2e81f4..bf0f06e84add 100644 --- a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/hive/TestHiveFaultTolerantExecutionConnectorTest.java +++ b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/hive/TestHiveFaultTolerantExecutionConnectorTest.java @@ -51,9 +51,15 @@ protected QueryRunner createQueryRunner() } @Override - public void testScaleWriters() + public void testMultipleWriters() { - testWithAllStorageFormats(this::testSingleWriter); + // Not applicable for fault-tolerant mode. + } + + @Override + public void testMultipleWritersWithSkewedData() + { + // Not applicable for fault-tolerant mode. } // We need to override this method because in the case of pipeline execution,