Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data layout optimization (strategy generation). Part 3: compaction strategy generation with cost/gain scores #116

Merged
merged 14 commits into from
Aug 2, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ public final class DataCompactionConfig {
RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT;
public static final long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT =
RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES_DEFAULT;
public static final double ENTROPY_THRESHOLD_DEFAULT =
9E15; // file size < 87.5% default target size 512MB on average

@Builder.Default private long targetByteSize = TARGET_BYTE_SIZE_DEFAULT;
@Builder.Default private double minByteSizeRatio = MIN_BYTE_SIZE_RATIO_DEFAULT;
Expand All @@ -40,5 +38,4 @@ public final class DataCompactionConfig {
@Builder.Default private boolean partialProgressEnabled = true;
@Builder.Default private int partialProgressMaxCommits = PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT;
@Builder.Default private long maxFileGroupSizeBytes = MAX_FILE_GROUP_SIZE_BYTES_DEFAULT;
@Builder.Default private double fileEntropyThreshold = ENTROPY_THRESHOLD_DEFAULT;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.linkedin.openhouse.datalayout.datasource;

import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/** Represents table partition stats. */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PartitionStat {
// list of transformed values for a given table partition
// e.g. if table is partitioned by (datepartition: string, state: string)
// the list could be ["2024-01-01", "CA"]
private List<String> values;
teamurko marked this conversation as resolved.
Show resolved Hide resolved
teamurko marked this conversation as resolved.
Show resolved Hide resolved
private int fileCount;
sumedhsakdeo marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class TableFileStats implements DataSource<FileStat> {
@Override
public Dataset<FileStat> get() {
return spark
.sql(String.format("SELECT file_path, file_size_in_bytes FROM %s.files", tableName))
.sql(String.format("SELECT file_path, file_size_in_bytes FROM %s.data_files", tableName))
sumedhsakdeo marked this conversation as resolved.
Show resolved Hide resolved
.map(new FileStatMapper(), Encoders.bean(FileStat.class));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.linkedin.openhouse.datalayout.datasource;

import java.util.ArrayList;
import java.util.List;
import lombok.Builder;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;

/** Data source implementation for table partition statistics. */
@Builder
public class TablePartitionStats implements DataSource<PartitionStat> {
private final SparkSession spark;
private final String tableName;

@Override
public Dataset<PartitionStat> get() {
StructType partitionSchema =
spark.sql(String.format("SELECT * FROM %s.partitions", tableName)).schema();
try {
partitionSchema.apply("partition");
teamurko marked this conversation as resolved.
Show resolved Hide resolved
return spark
.sql(String.format("SELECT partition, file_count FROM %s.partitions", tableName))
.map(new TablePartitionStats.PartitionStatMapper(), Encoders.bean(PartitionStat.class));
} catch (IllegalArgumentException e) {
return spark
.sql(String.format("SELECT null, file_count FROM %s.partitions", tableName))
.map(new TablePartitionStats.PartitionStatMapper(), Encoders.bean(PartitionStat.class));
}
}

static class PartitionStatMapper implements MapFunction<Row, PartitionStat> {
@Override
public PartitionStat call(Row row) {
List<String> values = new ArrayList<>();
Row partition = row.getStruct(0);
if (partition != null) {
for (int i = 0; i < partition.size(); i++) {
values.add(partition.get(i).toString());
}
}
return PartitionStat.builder().values(values).fileCount(row.getInt(1)).build();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.linkedin.openhouse.datalayout.generator;

import com.linkedin.openhouse.datalayout.strategy.DataLayoutOptimizationStrategy;
import java.util.List;

public interface DataLayoutGenerator {
List<DataLayoutOptimizationStrategy> generate();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package com.linkedin.openhouse.datalayout.generator;

import com.linkedin.openhouse.datalayout.config.DataCompactionConfig;
import com.linkedin.openhouse.datalayout.datasource.FileStat;
import com.linkedin.openhouse.datalayout.datasource.TableFileStats;
import com.linkedin.openhouse.datalayout.datasource.TablePartitionStats;
import com.linkedin.openhouse.datalayout.strategy.DataLayoutOptimizationStrategy;
import java.util.Collections;
import java.util.List;
import lombok.Builder;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.Encoders;
import scala.Tuple2;

/**
* Data layout optimization strategies generator for OpenHouse. Generates a list of strategies with
* score.
*/
@Builder
public class OpenHouseDataLayoutGenerator implements DataLayoutGenerator {
private static final long MB = 1024 * 1024;
private static final long FILE_BLOCK_SIZE_BYTES = 256 * MB;
private static final long FILE_BLOCK_MARGIN_BYTES = 10 * MB;
private static final long NUM_FILE_GROUPS_PER_COMMIT = 100;
private static final long MAX_NUM_COMMITS = 30;
private static final long MAX_BYTES_SIZE_RATIO = 10;
private static final long REWRITE_BYTES_PER_SECOND = 2 * MB;
private static final long EXECUTOR_MEMORY_GB = 2;
private static final int MAX_CONCURRENT_FILE_GROUP_REWRITES = 50;
private static final int REWRITE_PARALLELISM = 900; // number of Spark tasks to run in parallel
private static final long TARGET_BYTES_SIZE = 2 * FILE_BLOCK_SIZE_BYTES - FILE_BLOCK_MARGIN_BYTES;
private static final double COMPUTE_STARTUP_COST_GB_HR = 0.5;
private final TableFileStats tableFileStats;
private final TablePartitionStats tablePartitionStats;

/**
* Generate a list of data layout optimization strategies based on the table file stats and
* historic query patterns.
*/
@Override
public List<DataLayoutOptimizationStrategy> generate() {
// skip single partition and non-partitioned tables
if (tablePartitionStats.get().count() <= 1) {
return Collections.emptyList();
}
return Collections.singletonList(generateCompactionStrategy());
}

/**
* Computes the compaction strategy for a partitioned table. Only files less than the minimum
* threshold are considered. The rewrite job parameters are calculated, as well as the following:
*
* <ul>
* <li>Estimated cost is computed as the GB-hrs spent in performing the compaction
* <li>Estimated gain is computed as the reduction in file count
* <li>Estimated score is computed as gain / cost, the number of files reduced per GB-hr of
* compute, the higher the score, the better the strategy
* </ul>
*/
private DataLayoutOptimizationStrategy generateCompactionStrategy() {
teamurko marked this conversation as resolved.
Show resolved Hide resolved
Tuple2<Long, Integer> fileStats =
tableFileStats
.get()
.map((MapFunction<FileStat, Long>) FileStat::getSize, Encoders.LONG())
.filter(
(FilterFunction<Long>)
size ->
size < TARGET_BYTES_SIZE * DataCompactionConfig.MIN_BYTE_SIZE_RATIO_DEFAULT)
.map(
(MapFunction<Long, Tuple2<Long, Integer>>) size -> new Tuple2<>(size, 1),
Encoders.tuple(Encoders.LONG(), Encoders.INT()))
.reduce(
(ReduceFunction<Tuple2<Long, Integer>>)
(a, b) -> new Tuple2<>(a._1 + b._1, a._2 + b._2));
long rewriteFileBytes = fileStats._1;
int rewriteFileCount = fileStats._2;

DataCompactionConfig.DataCompactionConfigBuilder configBuilder = DataCompactionConfig.builder();

configBuilder.targetByteSize(TARGET_BYTES_SIZE);

long estimatedFileGroupsCount =
Math.max(
tablePartitionStats.get().count(),
rewriteFileBytes / DataCompactionConfig.MAX_FILE_GROUP_SIZE_BYTES_DEFAULT);

int maxCommitsCount =
(int)
Math.min(
MAX_NUM_COMMITS,
(estimatedFileGroupsCount + NUM_FILE_GROUPS_PER_COMMIT - 1)
/ NUM_FILE_GROUPS_PER_COMMIT);
configBuilder.partialProgressMaxCommits(maxCommitsCount);

int estimatedTasksPerFileGroupCount =
(int) (DataCompactionConfig.MAX_FILE_GROUP_SIZE_BYTES_DEFAULT / TARGET_BYTES_SIZE);

configBuilder.maxConcurrentFileGroupRewrites(
Math.min(
MAX_CONCURRENT_FILE_GROUP_REWRITES,
(estimatedTasksPerFileGroupCount + REWRITE_PARALLELISM - 1)
/ estimatedTasksPerFileGroupCount));

// don't split large files
configBuilder.maxByteSizeRatio(MAX_BYTES_SIZE_RATIO);

long reducedFileCount = estimateReducedFileCount(rewriteFileBytes, rewriteFileCount);
double computeGbHr = estimateComputeGbHr(rewriteFileBytes);
// computeGbHr >= COMPUTE_STARTUP_COST_GB_HR
double reducedFileCountPerComputeGbHr = reducedFileCount / computeGbHr;
return DataLayoutOptimizationStrategy.builder()
.config(configBuilder.build())
teamurko marked this conversation as resolved.
Show resolved Hide resolved
.cost(computeGbHr)
.gain(reducedFileCount)
.score(reducedFileCountPerComputeGbHr)
.build();
}

private long estimateReducedFileCount(long rewriteFileBytes, int rewriteFileCount) {
// number of files after compaction rounded up
long resultFileCount = (rewriteFileBytes + TARGET_BYTES_SIZE - 1) / TARGET_BYTES_SIZE;
return Math.max(0, rewriteFileCount - resultFileCount);
}

private double estimateComputeGbHr(long rewriteBytes) {
double rewriteSeconds = rewriteBytes * 1.0 / REWRITE_BYTES_PER_SECOND;
double rewriteHours = rewriteSeconds / 3600.0;
return rewriteHours * EXECUTOR_MEMORY_GB + COMPUTE_STARTUP_COST_GB_HR;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.openhouse.datalayout.persistence;

import com.linkedin.openhouse.datalayout.layoutselection.DataLayoutOptimizationStrategy;
import com.linkedin.openhouse.datalayout.strategy.DataLayoutOptimizationStrategy;
import java.util.List;

/** DAO interface for persisting and loading data layout optimization strategies. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import com.linkedin.openhouse.datalayout.layoutselection.DataLayoutOptimizationStrategy;
import com.linkedin.openhouse.datalayout.strategy.DataLayoutOptimizationStrategy;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.openhouse.datalayout.layoutselection;
package com.linkedin.openhouse.datalayout.strategy;

import com.linkedin.openhouse.datalayout.config.DataCompactionConfig;
import lombok.Builder;
Expand All @@ -14,6 +14,8 @@
@EqualsAndHashCode
public class DataLayoutOptimizationStrategy {
private final double score;
private final double cost;
private final double gain;
private final DataCompactionConfig config;
// TODO: support sorting config
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.linkedin.openhouse.datalayout.datasource;

import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TablePartitionStatsTest extends OpenHouseSparkITest {
@Test
public void testPartitionedTablePartitionStats() throws Exception {
final String testTable = "db.test_table_partition_stats_partitioned";
try (SparkSession spark = getSparkSession()) {
spark.sql("USE openhouse");
spark.sql(
String.format(
"CREATE TABLE %s (id INT, data STRING, dt STRING) PARTITIONED BY (dt, id)",
testTable));
spark.sql(String.format("INSERT INTO %s VALUES (0, '0', '2024-01-01')", testTable));
spark.sql(String.format("INSERT INTO %s VALUES (1, '1', '2024-01-02')", testTable));
spark.sql(String.format("INSERT INTO %s VALUES (1, '2', '2024-01-02')", testTable));
TablePartitionStats tablePartitionStats =
TablePartitionStats.builder().spark(spark).tableName(testTable).build();
List<PartitionStat> stats = tablePartitionStats.get().collectAsList();
Assertions.assertEquals(2, stats.size());
stats.sort(Comparator.comparing(a -> a.getValues().get(0)));
Assertions.assertEquals(Arrays.asList("2024-01-01", "0"), stats.get(0).getValues());
Assertions.assertEquals(1, stats.get(0).getFileCount());
Assertions.assertEquals(Arrays.asList("2024-01-02", "1"), stats.get(1).getValues());
Assertions.assertEquals(2, stats.get(1).getFileCount());
}
}

@Test
public void testNonPartitionedTablePartitionStats() throws Exception {
final String testTable = "db.test_table_partition_stats_non_partitioned";
try (SparkSession spark = getSparkSession()) {
spark.sql("USE openhouse");
spark.sql(String.format("CREATE TABLE %s (id INT, data STRING)", testTable));
spark.sql(String.format("INSERT INTO %s VALUES (0, '0')", testTable));
spark.sql(String.format("INSERT INTO %s VALUES (1, '1')", testTable));
TablePartitionStats tablePartitionStats =
TablePartitionStats.builder().spark(spark).tableName(testTable).build();
List<PartitionStat> stats = tablePartitionStats.get().collectAsList();
Assertions.assertEquals(1, stats.size());
Assertions.assertTrue(stats.get(0).getValues().isEmpty());
Assertions.assertEquals(2, stats.get(0).getFileCount());
}
}
}
Loading
Loading