Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4036,27 +4036,19 @@ public void testBucketedExecution()
}

@Test
public void testScaleWriters()
{
testWithAllStorageFormats(this::testSingleWriter);
testWithAllStorageFormats(this::testMultipleWriters);
testWithAllStorageFormats(this::testMultipleWritersWithSkewedData);
}

protected void testSingleWriter(Session session, HiveStorageFormat storageFormat)
public void testSingleWriter()
{
try {
// small table that will only have one writer
@Language("SQL") String createTableSql = format("" +
"CREATE TABLE scale_writers_small WITH (format = '%s') AS " +
"SELECT * FROM tpch.tiny.orders",
storageFormat);
// Small table that will only have one writer
@Language("SQL") String createTableSql = "" +
"CREATE TABLE scale_writers_small WITH (format = 'PARQUET') AS " +
"SELECT * FROM tpch.tiny.orders";
assertUpdate(
Session.builder(session)
Session.builder(getSession())
.setSystemProperty("task_writer_count", "1")
.setSystemProperty("scale_writers", "true")
.setSystemProperty("task_scale_writers_enabled", "false")
.setSystemProperty("writer_scaling_min_data_processed", "32MB")
.setSystemProperty("writer_scaling_min_data_processed", "100MB")
.build(),
createTableSql,
(long) computeActual("SELECT count(*) FROM tpch.tiny.orders").getOnlyValue());
Expand All @@ -4068,24 +4060,23 @@ protected void testSingleWriter(Session session, HiveStorageFormat storageFormat
}
}

private void testMultipleWriters(Session session, HiveStorageFormat storageFormat)
@Test
public void testMultipleWriters()
{
try {
// large table that will scale writers to multiple machines
@Language("SQL") String createTableSql = format("" +
"CREATE TABLE scale_writers_large WITH (format = '%s') AS " +
"SELECT * FROM tpch.sf1.orders",
storageFormat);
// We need to use large table (sf2) to see the effect. Otherwise, a single writer will write the entire
// data before ScaledWriterScheduler is able to scale it to multiple machines.
@Language("SQL") String createTableSql = "CREATE TABLE scale_writers_large WITH (format = 'PARQUET') AS " +
"SELECT * FROM tpch.sf2.orders";
assertUpdate(
Session.builder(session)
Session.builder(getSession())
.setSystemProperty("task_writer_count", "1")
.setSystemProperty("scale_writers", "true")
.setSystemProperty("task_scale_writers_enabled", "false")
.setSystemProperty("writer_scaling_min_data_processed", "1MB")
.setCatalogSessionProperty(catalog, "parquet_writer_block_size", "4MB")
.build(),
createTableSql,
(long) computeActual("SELECT count(*) FROM tpch.sf1.orders").getOnlyValue());
(long) computeActual("SELECT count(*) FROM tpch.sf2.orders").getOnlyValue());

long files = (long) computeScalar("SELECT count(DISTINCT \"$path\") FROM scale_writers_large");
long workers = (long) computeScalar("SELECT count(*) FROM system.runtime.nodes");
Expand All @@ -4096,24 +4087,24 @@ private void testMultipleWriters(Session session, HiveStorageFormat storageForma
}
}

private void testMultipleWritersWithSkewedData(Session session, HiveStorageFormat storageFormat)
@Test
public void testMultipleWritersWithSkewedData()
{
try {
// skewed table that will scale writers to multiple machines
String selectSql = "SELECT t1.* FROM (SELECT *, case when orderkey >= 0 then 1 else orderkey end as join_key FROM tpch.sf1.orders) t1 " +
"INNER JOIN (SELECT orderkey FROM tpch.sf1.orders) t2 " +
// We need to use large table (sf2) to see the effect. Otherwise, a single writer will write the entire
// data before ScaledWriterScheduler is able to scale it to multiple machines.
// Skewed table that will scale writers to multiple machines.
String selectSql = "SELECT t1.* FROM (SELECT *, case when orderkey >= 0 then 1 else orderkey end as join_key FROM tpch.sf2.orders) t1 " +
"INNER JOIN (SELECT orderkey FROM tpch.sf2.orders) t2 " +
"ON t1.join_key = t2.orderkey";
@Language("SQL") String createTableSql = format("" +
"CREATE TABLE scale_writers_skewed WITH (format = '%s') AS " + selectSql,
storageFormat);
@Language("SQL") String createTableSql = "CREATE TABLE scale_writers_skewed WITH (format = 'PARQUET') AS " + selectSql;
assertUpdate(
Session.builder(session)
Session.builder(getSession())
.setSystemProperty("task_writer_count", "1")
.setSystemProperty("scale_writers", "true")
.setSystemProperty("task_scale_writers_enabled", "false")
.setSystemProperty("writer_scaling_min_data_processed", "1MB")
.setSystemProperty("join_distribution_type", "PARTITIONED")
.setCatalogSessionProperty(catalog, "parquet_writer_block_size", "4MB")
.build(),
createTableSql,
(long) computeActual("SELECT count(*) FROM (" + selectSql + ")")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,15 @@ protected QueryRunner createQueryRunner()
}

@Override
public void testScaleWriters()
public void testMultipleWriters()
{
testWithAllStorageFormats(this::testSingleWriter);
// Not applicable for fault-tolerant mode.
}

@Override
public void testMultipleWritersWithSkewedData()
{
// Not applicable for fault-tolerant mode.
}

// We need to override this method because in the case of pipeline execution,
Expand Down