From e1148366ab8f35ecea707e6c53b1d71030adeb3c Mon Sep 17 00:00:00 2001 From: Gaurav Sehgal Date: Wed, 16 Aug 2023 13:39:21 -0400 Subject: [PATCH] Simplify Hive connector e2e scale writers tests First, We need to use large table (sf2) to see the effect. Otherwise, a single writer will write the entire data before ScaledWriterScheduler is able to scale it to multiple machines. Second, Since we now use page size to scale up instead of physical written bytes, we don't have to test on different file formats. This reduces the number of tests and overall runtime. --- .../plugin/hive/BaseHiveConnectorTest.java | 57 ++++++++----------- ...veFaultTolerantExecutionConnectorTest.java | 10 +++- 2 files changed, 32 insertions(+), 35 deletions(-) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index 6a7f6e3989ed..fb9098e1eccf 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -4036,27 +4036,19 @@ public void testBucketedExecution() } @Test - public void testScaleWriters() - { - testWithAllStorageFormats(this::testSingleWriter); - testWithAllStorageFormats(this::testMultipleWriters); - testWithAllStorageFormats(this::testMultipleWritersWithSkewedData); - } - - protected void testSingleWriter(Session session, HiveStorageFormat storageFormat) + public void testSingleWriter() { try { - // small table that will only have one writer - @Language("SQL") String createTableSql = format("" + - "CREATE TABLE scale_writers_small WITH (format = '%s') AS " + - "SELECT * FROM tpch.tiny.orders", - storageFormat); + // Small table that will only have one writer + @Language("SQL") String createTableSql = "" + + "CREATE TABLE scale_writers_small WITH (format = 'PARQUET') AS " + + "SELECT * FROM tpch.tiny.orders"; assertUpdate( - Session.builder(session) + Session.builder(getSession()) .setSystemProperty("task_writer_count", "1") .setSystemProperty("scale_writers", "true") .setSystemProperty("task_scale_writers_enabled", "false") - .setSystemProperty("writer_scaling_min_data_processed", "32MB") + .setSystemProperty("writer_scaling_min_data_processed", "100MB") .build(), createTableSql, (long) computeActual("SELECT count(*) FROM tpch.tiny.orders").getOnlyValue()); @@ -4068,24 +4060,23 @@ protected void testSingleWriter(Session session, HiveStorageFormat storageFormat } } - private void testMultipleWriters(Session session, HiveStorageFormat storageFormat) + @Test + public void testMultipleWriters() { try { - // large table that will scale writers to multiple machines - @Language("SQL") String createTableSql = format("" + - "CREATE TABLE scale_writers_large WITH (format = '%s') AS " + - "SELECT * FROM tpch.sf1.orders", - storageFormat); + // We need to use large table (sf2) to see the effect. Otherwise, a single writer will write the entire + // data before ScaledWriterScheduler is able to scale it to multiple machines. + @Language("SQL") String createTableSql = "CREATE TABLE scale_writers_large WITH (format = 'PARQUET') AS " + + "SELECT * FROM tpch.sf2.orders"; assertUpdate( - Session.builder(session) + Session.builder(getSession()) .setSystemProperty("task_writer_count", "1") .setSystemProperty("scale_writers", "true") .setSystemProperty("task_scale_writers_enabled", "false") .setSystemProperty("writer_scaling_min_data_processed", "1MB") - .setCatalogSessionProperty(catalog, "parquet_writer_block_size", "4MB") .build(), createTableSql, - (long) computeActual("SELECT count(*) FROM tpch.sf1.orders").getOnlyValue()); + (long) computeActual("SELECT count(*) FROM tpch.sf2.orders").getOnlyValue()); long files = (long) computeScalar("SELECT count(DISTINCT \"$path\") FROM scale_writers_large"); long workers = (long) computeScalar("SELECT count(*) FROM system.runtime.nodes"); @@ -4096,24 +4087,24 @@ private void testMultipleWriters(Session session, HiveStorageFormat storageForma } } - private void testMultipleWritersWithSkewedData(Session session, HiveStorageFormat storageFormat) + @Test + public void testMultipleWritersWithSkewedData() { try { - // skewed table that will scale writers to multiple machines - String selectSql = "SELECT t1.* FROM (SELECT *, case when orderkey >= 0 then 1 else orderkey end as join_key FROM tpch.sf1.orders) t1 " + - "INNER JOIN (SELECT orderkey FROM tpch.sf1.orders) t2 " + + // We need to use large table (sf2) to see the effect. Otherwise, a single writer will write the entire + // data before ScaledWriterScheduler is able to scale it to multiple machines. + // Skewed table that will scale writers to multiple machines. + String selectSql = "SELECT t1.* FROM (SELECT *, case when orderkey >= 0 then 1 else orderkey end as join_key FROM tpch.sf2.orders) t1 " + + "INNER JOIN (SELECT orderkey FROM tpch.sf2.orders) t2 " + "ON t1.join_key = t2.orderkey"; - @Language("SQL") String createTableSql = format("" + - "CREATE TABLE scale_writers_skewed WITH (format = '%s') AS " + selectSql, - storageFormat); + @Language("SQL") String createTableSql = "CREATE TABLE scale_writers_skewed WITH (format = 'PARQUET') AS " + selectSql; assertUpdate( - Session.builder(session) + Session.builder(getSession()) .setSystemProperty("task_writer_count", "1") .setSystemProperty("scale_writers", "true") .setSystemProperty("task_scale_writers_enabled", "false") .setSystemProperty("writer_scaling_min_data_processed", "1MB") .setSystemProperty("join_distribution_type", "PARTITIONED") - .setCatalogSessionProperty(catalog, "parquet_writer_block_size", "4MB") .build(), createTableSql, (long) computeActual("SELECT count(*) FROM (" + selectSql + ")") diff --git a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/hive/TestHiveFaultTolerantExecutionConnectorTest.java b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/hive/TestHiveFaultTolerantExecutionConnectorTest.java index fac9de2e81f4..bf0f06e84add 100644 --- a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/hive/TestHiveFaultTolerantExecutionConnectorTest.java +++ b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/hive/TestHiveFaultTolerantExecutionConnectorTest.java @@ -51,9 +51,15 @@ protected QueryRunner createQueryRunner() } @Override - public void testScaleWriters() + public void testMultipleWriters() { - testWithAllStorageFormats(this::testSingleWriter); + // Not applicable for fault-tolerant mode. + } + + @Override + public void testMultipleWritersWithSkewedData() + { + // Not applicable for fault-tolerant mode. } // We need to override this method because in the case of pipeline execution,