Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class IcebergTableProperties
{
public static final String FILE_FORMAT_PROPERTY = "format";
public static final String PARTITIONING_PROPERTY = "partitioning";
public static final String SORTED_BY_PROPERTY = "sorted_by";
Comment thread
osscm marked this conversation as resolved.
public static final String LOCATION_PROPERTY = "location";
public static final String FORMAT_VERSION_PROPERTY = "format_version";
public static final String ORC_BLOOM_FILTER_COLUMNS = "orc_bloom_filter_columns";
Expand Down Expand Up @@ -69,6 +70,15 @@ public IcebergTableProperties(
false,
value -> (List<?>) value,
value -> value))
.add(new PropertyMetadata<>(
SORTED_BY_PROPERTY,
"Sorted columns",
new ArrayType(VARCHAR),
List.class,
ImmutableList.of(),
false,
value -> (List<?>) value,
value -> value))
.add(stringProperty(
LOCATION_PROPERTY,
"File system location URI for the table",
Expand Down Expand Up @@ -118,6 +128,13 @@ public static List<String> getPartitioning(Map<String, Object> tableProperties)
return partitioning == null ? ImmutableList.of() : ImmutableList.copyOf(partitioning);
}

@SuppressWarnings("unchecked")
public static List<String> getSortOrder(Map<String, Object> tableProperties)
Copy link
Copy Markdown
Member

@ebyhr ebyhr Jun 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sort-order is stored to metadata, but it isn't used from anywhere if my understanding is correct (wrong?). What's the benefit adding this property in the current shape? Don't we need to respect the property during writes? It would be nice if you can add tests showing the benefit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ebyhr this PR is the first step in supporting `sorted_by’.

So, this PR is only intended to add the support of sorted_by for the CREATE TABLE DDL syntax.

subsequently we can add for ALTER TABLE and then to support while writing.

DDL changes will help when Spark is being used for the ingestion (and that is the case we see it almost all the time). Spark uses icebergs sorting spec to write as well.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree adding sorted_by property without proper support. It's confusing to users.

{
List<String> sortedBy = (List<String>) tableProperties.get(SORTED_BY_PROPERTY);
return sortedBy == null ? ImmutableList.of() : ImmutableList.copyOf(sortedBy);
}

public static Optional<String> getTableLocation(Map<String, Object> tableProperties)
{
return Optional.ofNullable((String) tableProperties.get(LOCATION_PROPERTY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
Expand Down Expand Up @@ -98,9 +99,11 @@
import static io.trino.plugin.iceberg.IcebergTableProperties.getOrcBloomFilterColumns;
import static io.trino.plugin.iceberg.IcebergTableProperties.getOrcBloomFilterFpp;
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
import static io.trino.plugin.iceberg.IcebergTableProperties.getSortOrder;
import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation;
import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields;
import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields;
import static io.trino.plugin.iceberg.SortFields.parseSortFields;
import static io.trino.plugin.iceberg.TrinoTypes.getNextValue;
import static io.trino.plugin.iceberg.TrinoTypes.getPreviousValue;
import static io.trino.plugin.iceberg.TypeConverter.toIcebergType;
Expand Down Expand Up @@ -148,7 +151,13 @@

public final class IcebergUtil
{
private static final Pattern SIMPLE_NAME = Pattern.compile("[a-z][a-z0-9]*");
private static final Pattern SIMPLE_NAME = Pattern.compile("\\s*?[a-z][a-z0-9]*\\s*?");
private static final String UNQUOTED_IDENTIFIER = "\\s*?[a-zA-Z_][a-zA-Z0-9_]*\\s*?";
private static final Pattern UNQUOTED_IDENTIFIER_PATTERN = Pattern.compile(UNQUOTED_IDENTIFIER);
private static final String QUOTED_IDENTIFIER = "\"(?:\"\"|[^\"A-Z])*\"";
public static final String IDENTIFIER = "(" + UNQUOTED_IDENTIFIER + "|" + QUOTED_IDENTIFIER + ")";
public static final String FUNCTION_ARGUMENT_NAME = "\\s*?\\(" + IDENTIFIER + "\\)\\s*?";
public static final String FUNCTION_ARGUMENT_NAME_AND_INT = "\\s*?\\(" + IDENTIFIER + ",\\s*?(\\d+)\\s*?\\)";

private IcebergUtil() {}

Expand Down Expand Up @@ -565,6 +574,7 @@ public static Transaction newCreateTableTransaction(TrinoCatalog catalog, Connec
SchemaTableName schemaTableName = tableMetadata.getTable();
Schema schema = schemaFromMetadata(tableMetadata.getColumns());
PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties()));
SortOrder sortOrder = buildSortFields(schema, getSortOrder(tableMetadata.getProperties()));
String targetPath = getTableLocation(tableMetadata.getProperties())
.orElseGet(() -> catalog.defaultTableLocation(session, schemaTableName));

Expand All @@ -586,7 +596,7 @@ public static Transaction newCreateTableTransaction(TrinoCatalog catalog, Connec
propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get());
}

return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, targetPath, propertiesBuilder.buildOrThrow());
return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, targetPath, propertiesBuilder.buildOrThrow());
}

public static long getSnapshotIdAsOfTime(Table table, long epochMillis)
Expand All @@ -610,6 +620,22 @@ public static void validateTableCanBeDropped(Table table)
}
}

public static String fromIdentifier(String identifier)
{
if (identifier.startsWith("\"") && identifier.endsWith("\"")) {
return identifier.substring(1, identifier.length() - 1).replace("\"\"", "\"");
}
return identifier.toLowerCase(Locale.ENGLISH);
}

public static String toIdentifier(String column)
{
if (UNQUOTED_IDENTIFIER_PATTERN.matcher(column).matches()) {
return column;
}
return "\"" + column.replace("\"", "\"\"") + "\"";
}

private static void checkFormatForProperty(FileFormat actualStorageFormat, FileFormat expectedStorageFormat, String propertyName)
{
if (actualStorageFormat != expectedStorageFormat) {
Expand All @@ -626,4 +652,14 @@ private static void validateOrcBloomFilterColumns(ConnectorTableMetadata tableMe
throw new TrinoException(INVALID_TABLE_PROPERTY, format("Orc bloom filter columns %s not present in schema", Sets.difference(ImmutableSet.copyOf(orcBloomFilterColumns), allColumns)));
}
}

private static SortOrder buildSortFields(Schema schema, List<String> fields)
{
try {
return parseSortFields(schema, fields);
}
catch (RuntimeException re) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "Unable to parse sorting value", re);
}
}
}
Loading