Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileScanTask;
Expand Down Expand Up @@ -163,6 +164,7 @@ public class IcebergMetadata
private static final Logger log = Logger.get(IcebergMetadata.class);

private static final Pattern PATH_PATTERN = Pattern.compile("(.*)/[^/]+");
private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 1;

private final TypeManager typeManager;
private final JsonCodec<CommitTaskData> commitTaskCodec;
Expand Down Expand Up @@ -774,6 +776,17 @@ private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandl
IcebergOptimizeHandle optimizeHandle = (IcebergOptimizeHandle) executeHandle.getProcedureHandle();
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());

int tableFormatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();
if (tableFormatVersion > OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION) {
// Currently, Optimize would fail when position deletes files are present in Iceberg table
throw new TrinoException(NOT_SUPPORTED, format(
"%s is not supported for Iceberg table format version > %d. Table %s format version is %s.",
OPTIMIZE.name(),
OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION,
table.getSchemaTableName(),
tableFormatVersion));
}

verify(transaction == null, "transaction already set");
transaction = icebergTable.newTransaction();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1469,6 +1469,22 @@ public void testMissingMetrics()
onSpark().executeQuery("DROP TABLE " + sparkTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testOptimizeFailsOnV2IcebergTable()
{
String tableName = format("test_optimize_fails_on_v2_iceberg_table_%s", randomTableSuffix());
String sparkTableName = sparkTableName(tableName);
String trinoTableName = trinoTableName(tableName);

onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(a INT, b INT) " +
"USING ICEBERG PARTITIONED BY (b) " +
"TBLPROPERTIES ('format-version'='2', 'write.delete.mode'='merge-on-read')");
onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 2), (2, 2), (3, 2), (11, 12), (12, 12), (13, 12)");

assertQueryFailure(() -> onTrino().executeQuery(format("ALTER TABLE %s EXECUTE OPTIMIZE", trinoTableName)))
.hasMessageContaining("is not supported for Iceberg table format version > 1");
}

private static String escapeSparkString(String value)
{
return value.replace("\\", "\\\\").replace("'", "\\'");
Expand Down