Skip to content

Commit

Permalink
Add partition stats data source
Browse files Browse the repository at this point in the history
  • Loading branch information
teamurko committed Jun 25, 2024
1 parent b861f93 commit 4f7d668
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.openhouse.datalayout.datasource;

import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand All @@ -10,7 +11,7 @@
@Builder
@NoArgsConstructor
@AllArgsConstructor
public final class FileStat {
public final class FileStat implements Serializable {
private String path;
private long size;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.linkedin.openhouse.datalayout.datasource;

import java.io.Serializable;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/** Represents table partition. */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PartitionStat implements Serializable {
private List<String> values;
private int fileCount;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class TableFileStats implements DataSource<FileStat> {
@Override
public Dataset<FileStat> get() {
return spark
.sql(String.format("SELECT file_path, file_size_in_bytes FROM %s.files", tableName))
.sql(String.format("SELECT file_path, file_size_in_bytes FROM %s.data_files", tableName))
.map(new FileStatMapper(), Encoders.bean(FileStat.class));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.linkedin.openhouse.datalayout.datasource;

import java.util.ArrayList;
import java.util.List;
import lombok.Builder;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;

/** Data source implementation for table partition statistics. */
@Builder
public class TablePartitionStats implements DataSource<PartitionStat> {
private final SparkSession spark;
private final String tableName;

/**
* Get partition statistics dataset for the table.
*
* @return Dataset of partition statistics sorted by partition values.
*/
@Override
public Dataset<PartitionStat> get() {
StructType partitionSchema =
spark.sql(String.format("SELECT * FROM %s.partitions", tableName)).schema();
try {
partitionSchema.apply("partition");
return spark
.sql(
String.format(
"SELECT partition, file_count FROM %s.partitions ORDER BY partition", tableName))
.map(new TablePartitionStats.PartitionStatMapper(), Encoders.bean(PartitionStat.class));
} catch (IllegalArgumentException e) {
return spark
.sql(String.format("SELECT null, file_count FROM %s.partitions", tableName))
.map(new TablePartitionStats.PartitionStatMapper(), Encoders.bean(PartitionStat.class));
}
}

static class PartitionStatMapper implements MapFunction<Row, PartitionStat> {
@Override
public PartitionStat call(Row row) {
List<String> values = new ArrayList<>();
Row partition = row.getStruct(0);
if (partition != null) {
for (int i = 0; i < partition.size(); i++) {
values.add(partition.get(i).toString());
}
}
return PartitionStat.builder().values(values).fileCount(row.getInt(1)).build();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.linkedin.openhouse.datalayout.datasource;

import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TablePartitionStatsTest extends OpenHouseSparkITest {
@Test
public void testPartitionedTablePartitionStats() throws Exception {
final String testTable = "db.test_table_partition_stats_partitioned";
try (SparkSession spark = getSparkSession()) {
spark.sql("USE openhouse");
spark.sql(
String.format(
"CREATE TABLE %s (id INT, data STRING, dt STRING) PARTITIONED BY (dt, id)",
testTable));
spark.sql(String.format("INSERT INTO %s VALUES (0, '0', '2024-01-01')", testTable));
spark.sql(String.format("INSERT INTO %s VALUES (1, '1', '2024-01-02')", testTable));
spark.sql(String.format("INSERT INTO %s VALUES (1, '2', '2024-01-02')", testTable));
TablePartitionStats tablePartitionStats =
TablePartitionStats.builder().spark(spark).tableName(testTable).build();
List<PartitionStat> stats = tablePartitionStats.get().collectAsList();
Assertions.assertEquals(2, stats.size());
Assertions.assertEquals(Arrays.asList("2024-01-01", "0"), stats.get(0).getValues());
Assertions.assertEquals(1, stats.get(0).getFileCount());
Assertions.assertEquals(Arrays.asList("2024-01-02", "1"), stats.get(1).getValues());
Assertions.assertEquals(2, stats.get(1).getFileCount());
}
}

@Test
public void testNonPartitionedTablePartitionStats() throws Exception {
final String testTable = "db.test_table_partition_stats_non_partitioned";
try (SparkSession spark = getSparkSession()) {
spark.sql("USE openhouse");
spark.sql(String.format("CREATE TABLE %s (id INT, data STRING)", testTable));
spark.sql(String.format("INSERT INTO %s VALUES (0, '0')", testTable));
spark.sql(String.format("INSERT INTO %s VALUES (1, '1')", testTable));
TablePartitionStats tablePartitionStats =
TablePartitionStats.builder().spark(spark).tableName(testTable).build();
List<PartitionStat> stats = tablePartitionStats.get().collectAsList();
Assertions.assertEquals(1, stats.size());
Assertions.assertTrue(stats.get(0).getValues().isEmpty());
Assertions.assertEquals(2, stats.get(0).getFileCount());
}
}
}

0 comments on commit 4f7d668

Please sign in to comment.