Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,16 @@
import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableProcedureId;
import io.trino.plugin.iceberg.procedure.MigrationUtils.RecursiveDirectory;
import io.trino.plugin.iceberg.system.files.FilesTable;
import io.trino.plugin.iceberg.system.AllManifestsTable;
import io.trino.plugin.iceberg.system.EntriesTable;
import io.trino.plugin.iceberg.system.FilesTable;
import io.trino.plugin.iceberg.system.HistoryTable;
import io.trino.plugin.iceberg.system.ManifestsTable;
import io.trino.plugin.iceberg.system.MetadataLogEntriesTable;
import io.trino.plugin.iceberg.system.PartitionsTable;
import io.trino.plugin.iceberg.system.PropertiesTable;
import io.trino.plugin.iceberg.system.RefsTable;
import io.trino.plugin.iceberg.system.SnapshotsTable;
import io.trino.plugin.iceberg.util.DataFileWithDeleteFiles;
import io.trino.spi.ErrorCode;
import io.trino.spi.RefreshType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import static io.trino.spi.function.InvocationConvention.simpleConvention;
import static java.util.Objects.requireNonNull;

record IcebergStatistics(
public record IcebergStatistics(
long recordCount,
long fileCount,
long size,
Expand All @@ -53,7 +53,7 @@ record IcebergStatistics(
Map<Integer, Long> nanCounts,
Map<Integer, Long> columnSizes)
{
IcebergStatistics
public IcebergStatistics
{
minValues = ImmutableMap.copyOf(requireNonNull(minValues, "minValues is null"));
maxValues = ImmutableMap.copyOf(requireNonNull(maxValues, "maxValues is null"));
Expand All @@ -62,7 +62,7 @@ record IcebergStatistics(
columnSizes = ImmutableMap.copyOf(requireNonNull(columnSizes, "columnSizes is null"));
}

static class Builder
public static class Builder
{
private final TypeManager typeManager;
private final Map<Integer, io.trino.spi.type.Type> fieldIdToTrinoType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Int128;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeOperators;
Expand Down Expand Up @@ -1239,35 +1238,6 @@ public static long getModificationTime(String path, TrinoFileSystem fileSystem)
}
}

public static Optional<IcebergPartitionColumn> getPartitionColumnType(List<PartitionField> fields, Schema schema, TypeManager typeManager)
{
if (fields.isEmpty()) {
return Optional.empty();
}
List<RowType.Field> partitionFields = fields.stream()
.map(field -> RowType.field(
field.name(),
toTrinoType(field.transform().getResultType(schema.findType(field.sourceId())), typeManager)))
.collect(toImmutableList());
List<Integer> fieldIds = fields.stream()
.map(PartitionField::fieldId)
.collect(toImmutableList());
return Optional.of(new IcebergPartitionColumn(RowType.from(partitionFields), fieldIds));
}

public static List<org.apache.iceberg.types.Type> partitionTypes(
List<PartitionField> partitionFields,
Map<Integer, PrimitiveType> idToPrimitiveTypeMapping)
{
ImmutableList.Builder<org.apache.iceberg.types.Type> partitionTypeBuilder = ImmutableList.builder();
for (PartitionField partitionField : partitionFields) {
PrimitiveType sourceType = idToPrimitiveTypeMapping.get(partitionField.sourceId());
org.apache.iceberg.types.Type type = partitionField.transform().getResultType(sourceType);
partitionTypeBuilder.add(type);
}
return partitionTypeBuilder.build();
}

public static ManifestReader<? extends ContentFile<?>> readerForManifest(ManifestFile manifest, Table table)
{
return readerForManifest(manifest, table.io(), table.specs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;
package io.trino.plugin.iceberg.system;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.iceberg.util.PageListBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;
package io.trino.plugin.iceberg.system;

import com.google.common.collect.ImmutableMap;
import io.trino.plugin.iceberg.util.PageListBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;
package io.trino.plugin.iceberg.system;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.iceberg.system.files.FilesTable;
import io.trino.plugin.iceberg.IcebergUtil;
import io.trino.plugin.iceberg.util.PageListBuilder;
import io.trino.spi.block.ArrayBlockBuilder;
import io.trino.spi.block.MapBlockBuilder;
Expand All @@ -35,7 +35,9 @@
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Type.PrimitiveType;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.StructProjection;

import java.nio.ByteBuffer;
Expand All @@ -49,11 +51,11 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.Slices.wrappedHeapBuffer;
import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionColumnType;
import static io.trino.plugin.iceberg.IcebergUtil.partitionTypes;
import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes;
import static io.trino.plugin.iceberg.PartitionsTable.getAllPartitionFields;
import static io.trino.plugin.iceberg.system.files.FilesTable.getIcebergIdToTypeMapping;
import static io.trino.plugin.iceberg.util.SystemTableUtil.getAllPartitionFields;
import static io.trino.plugin.iceberg.util.SystemTableUtil.getPartitionColumnType;
import static io.trino.plugin.iceberg.util.SystemTableUtil.partitionTypes;
import static io.trino.plugin.iceberg.util.SystemTableUtil.readableMetricsToJson;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.StandardTypes.JSON;
Expand All @@ -72,8 +74,8 @@
public class EntriesTable
extends BaseSystemTable
{
private final Map<Integer, Type> idToTypeMapping;
private final List<Types.NestedField> primitiveFields;
private final Map<Integer, PrimitiveType> idToTypeMapping;
private final List<NestedField> primitiveFields;
private final Optional<IcebergPartitionColumn> partitionColumn;
private final List<Type> partitionTypes;

Expand All @@ -87,13 +89,13 @@ public EntriesTable(TypeManager typeManager, SchemaTableName tableName, Table ic
metadataTableType,
executor);
checkArgument(metadataTableType == ALL_ENTRIES || metadataTableType == ENTRIES, "Unexpected metadata table type: %s", metadataTableType);
idToTypeMapping = getIcebergIdToTypeMapping(icebergTable.schema());
idToTypeMapping = primitiveFieldTypes(icebergTable.schema());
primitiveFields = IcebergUtil.primitiveFields(icebergTable.schema()).stream()
.sorted(Comparator.comparing(Types.NestedField::name))
.sorted(Comparator.comparing(NestedField::name))
.collect(toImmutableList());
List<PartitionField> partitionFields = getAllPartitionFields(icebergTable);
partitionColumn = getPartitionColumnType(partitionFields, icebergTable.schema(), typeManager);
partitionTypes = partitionTypes(partitionFields, primitiveFieldTypes(icebergTable.schema()));
partitionColumn = getPartitionColumnType(typeManager, partitionFields, icebergTable.schema());
partitionTypes = partitionTypes(partitionFields, idToTypeMapping);
}

private static List<ColumnMetadata> columns(TypeManager typeManager, Table icebergTable)
Expand All @@ -111,7 +113,7 @@ private static List<ColumnMetadata> columns(TypeManager typeManager, Table icebe
private static List<RowType.Field> dataFileFieldMetadata(TypeManager typeManager, Table icebergTable)
{
List<PartitionField> partitionFields = getAllPartitionFields(icebergTable);
Optional<IcebergPartitionColumn> partitionColumnType = getPartitionColumnType(partitionFields, icebergTable.schema(), typeManager);
Optional<IcebergPartitionColumn> partitionColumnType = getPartitionColumnType(typeManager, partitionFields, icebergTable.schema());

ImmutableList.Builder<RowType.Field> fields = ImmutableList.builder();
fields.add(new RowType.Field(Optional.of("content"), INTEGER));
Expand Down Expand Up @@ -145,7 +147,7 @@ protected void addRow(PageListBuilder pagesBuilder, Row row, TimeZoneKey timeZon
StructProjection dataFile = row.get("data_file", StructProjection.class);
appendDataFile((RowBlockBuilder) pagesBuilder.nextColumn(), dataFile);
ReadableMetricsStruct readableMetrics = row.get("readable_metrics", ReadableMetricsStruct.class);
String readableMetricsJson = FilesTable.toJson(readableMetrics, primitiveFields);
String readableMetricsJson = readableMetricsToJson(readableMetrics, primitiveFields);
pagesBuilder.appendVarchar(readableMetricsJson);
pagesBuilder.endRow();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.system.files;
package io.trino.plugin.iceberg.system;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.plugin.base.util.JsonUtils;
import io.trino.plugin.iceberg.IcebergPartitionColumn;
import io.trino.plugin.iceberg.PartitionsTable;
import io.trino.plugin.iceberg.system.files.FilesTableSplitSource;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
Expand All @@ -34,26 +29,19 @@
import io.trino.spi.type.TypeSignature;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.MetricsUtil.ReadableColMetricsStruct;
import org.apache.iceberg.MetricsUtil.ReadableMetricsStruct;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SingleValueParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.types.Types;

import java.io.IOException;
import java.io.StringWriter;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionColumnType;
import static io.trino.plugin.iceberg.util.SystemTableUtil.getAllPartitionFields;
import static io.trino.plugin.iceberg.util.SystemTableUtil.getPartitionColumnType;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.StandardTypes.JSON;
Expand All @@ -65,26 +53,24 @@
public final class FilesTable
implements SystemTable
{
private static final JsonFactory JSON_FACTORY = JsonUtils.jsonFactoryBuilder().build();

static final String CONTENT_COLUMN_NAME = "content";
static final String FILE_PATH_COLUMN_NAME = "file_path";
static final String FILE_FORMAT_COLUMN_NAME = "file_format";
static final String SPEC_ID_COLUMN_NAME = "spec_id";
static final String PARTITION_COLUMN_NAME = "partition";
static final String RECORD_COUNT_COLUMN_NAME = "record_count";
static final String FILE_SIZE_IN_BYTES_COLUMN_NAME = "file_size_in_bytes";
static final String COLUMN_SIZES_COLUMN_NAME = "column_sizes";
static final String VALUE_COUNTS_COLUMN_NAME = "value_counts";
static final String NULL_VALUE_COUNTS_COLUMN_NAME = "null_value_counts";
static final String NAN_VALUE_COUNTS_COLUMN_NAME = "nan_value_counts";
static final String LOWER_BOUNDS_COLUMN_NAME = "lower_bounds";
static final String UPPER_BOUNDS_COLUMN_NAME = "upper_bounds";
static final String KEY_METADATA_COLUMN_NAME = "key_metadata";
static final String SPLIT_OFFSETS_COLUMN_NAME = "split_offsets";
static final String EQUALITY_IDS_COLUMN_NAME = "equality_ids";
static final String SORT_ORDER_ID_COLUMN_NAME = "sort_order_id";
static final String READABLE_METRICS_COLUMN_NAME = "readable_metrics";
public static final String CONTENT_COLUMN_NAME = "content";
public static final String FILE_PATH_COLUMN_NAME = "file_path";
public static final String FILE_FORMAT_COLUMN_NAME = "file_format";
public static final String SPEC_ID_COLUMN_NAME = "spec_id";
public static final String PARTITION_COLUMN_NAME = "partition";
public static final String RECORD_COUNT_COLUMN_NAME = "record_count";
public static final String FILE_SIZE_IN_BYTES_COLUMN_NAME = "file_size_in_bytes";
public static final String COLUMN_SIZES_COLUMN_NAME = "column_sizes";
public static final String VALUE_COUNTS_COLUMN_NAME = "value_counts";
public static final String NULL_VALUE_COUNTS_COLUMN_NAME = "null_value_counts";
public static final String NAN_VALUE_COUNTS_COLUMN_NAME = "nan_value_counts";
public static final String LOWER_BOUNDS_COLUMN_NAME = "lower_bounds";
public static final String UPPER_BOUNDS_COLUMN_NAME = "upper_bounds";
public static final String KEY_METADATA_COLUMN_NAME = "key_metadata";
public static final String SPLIT_OFFSETS_COLUMN_NAME = "split_offsets";
public static final String EQUALITY_IDS_COLUMN_NAME = "equality_ids";
public static final String SORT_ORDER_ID_COLUMN_NAME = "sort_order_id";
public static final String READABLE_METRICS_COLUMN_NAME = "readable_metrics";

private static final List<String> COLUMN_NAMES = ImmutableList.of(
CONTENT_COLUMN_NAME,
Expand Down Expand Up @@ -116,8 +102,8 @@ public FilesTable(SchemaTableName tableName, TypeManager typeManager, Table iceb
this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");
this.snapshotId = requireNonNull(snapshotId, "snapshotId is null");

List<PartitionField> partitionFields = PartitionsTable.getAllPartitionFields(icebergTable);
this.partitionColumnType = getPartitionColumnType(partitionFields, icebergTable.schema(), typeManager)
List<PartitionField> partitionFields = getAllPartitionFields(icebergTable);
this.partitionColumnType = getPartitionColumnType(typeManager, partitionFields, icebergTable.schema())
.map(IcebergPartitionColumn::rowType);

ImmutableList.Builder<ColumnMetadata> columns = ImmutableList.builder();
Expand Down Expand Up @@ -161,93 +147,7 @@ public Optional<ConnectorSplitSource> splitSource(ConnectorSession connectorSess
}
}

public static String toJson(ReadableMetricsStruct readableMetrics, List<Types.NestedField> primitiveFields)
{
StringWriter writer = new StringWriter();
try {
JsonGenerator generator = JSON_FACTORY.createGenerator(writer);
generator.writeStartObject();

for (int i = 0; i < readableMetrics.size(); i++) {
Types.NestedField field = primitiveFields.get(i);
generator.writeFieldName(field.name());

generator.writeStartObject();
ReadableColMetricsStruct columnMetrics = readableMetrics.get(i, ReadableColMetricsStruct.class);

generator.writeFieldName("column_size");
Long columnSize = columnMetrics.get(0, Long.class);
if (columnSize == null) {
generator.writeNull();
}
else {
generator.writeNumber(columnSize);
}

generator.writeFieldName("value_count");
Long valueCount = columnMetrics.get(1, Long.class);
if (valueCount == null) {
generator.writeNull();
}
else {
generator.writeNumber(valueCount);
}

generator.writeFieldName("null_value_count");
Long nullValueCount = columnMetrics.get(2, Long.class);
if (nullValueCount == null) {
generator.writeNull();
}
else {
generator.writeNumber(nullValueCount);
}

generator.writeFieldName("nan_value_count");
Long nanValueCount = columnMetrics.get(3, Long.class);
if (nanValueCount == null) {
generator.writeNull();
}
else {
generator.writeNumber(nanValueCount);
}

generator.writeFieldName("lower_bound");
SingleValueParser.toJson(field.type(), columnMetrics.get(4, Object.class), generator);

generator.writeFieldName("upper_bound");
SingleValueParser.toJson(field.type(), columnMetrics.get(5, Object.class), generator);

generator.writeEndObject();
}

generator.writeEndObject();
generator.flush();
return writer.toString();
}
catch (IOException e) {
throw new UncheckedIOException("JSON conversion failed for: " + readableMetrics, e);
}
}

public static Map<Integer, org.apache.iceberg.types.Type> getIcebergIdToTypeMapping(Schema schema)
{
ImmutableMap.Builder<Integer, org.apache.iceberg.types.Type> icebergIdToTypeMapping = ImmutableMap.builder();
for (Types.NestedField field : schema.columns()) {
populateIcebergIdToTypeMapping(field, icebergIdToTypeMapping);
}
return icebergIdToTypeMapping.buildOrThrow();
}

private static void populateIcebergIdToTypeMapping(Types.NestedField field, ImmutableMap.Builder<Integer, org.apache.iceberg.types.Type> icebergIdToTypeMapping)
{
org.apache.iceberg.types.Type type = field.type();
icebergIdToTypeMapping.put(field.fieldId(), type);
if (type instanceof org.apache.iceberg.types.Type.NestedType) {
type.asNestedType().fields().forEach(child -> populateIcebergIdToTypeMapping(child, icebergIdToTypeMapping));
}
}

static Type getColumnType(String columnName, TypeManager typeManager)
public static Type getColumnType(String columnName, TypeManager typeManager)
{
return switch (columnName) {
case CONTENT_COLUMN_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;
package io.trino.plugin.iceberg.system;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
Expand Down
Loading