Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ public class DeltaLakeMetadata
"org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat");
public static final String CREATE_TABLE_AS_OPERATION = "CREATE TABLE AS SELECT";
public static final String CREATE_TABLE_OPERATION = "CREATE TABLE";
public static final String ADD_COLUMN_OPERATION = "ADD COLUMNS";
public static final String INSERT_OPERATION = "WRITE";
public static final String DELETE_OPERATION = "DELETE";
public static final String UPDATE_OPERATION = "UPDATE";
Expand Down Expand Up @@ -614,8 +615,10 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
.map(column -> toColumnHandle(column, partitionColumns))
.collect(toImmutableList());
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, targetPath.toString());
appendInitialTableEntries(
appendTableEntries(
0,
transactionLogWriter,
randomUUID().toString(),
deltaLakeColumns,
partitionColumns,
buildDeltaMetadataConfiguration(checkpointInterval),
Expand Down Expand Up @@ -865,8 +868,10 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
// filesystems for which we have proper implementations of TransactionLogSynchronizers.
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, handle.getLocation());

appendInitialTableEntries(
appendTableEntries(
0,
transactionLogWriter,
randomUUID().toString(),
handle.getInputColumns(),
handle.getPartitionedBy(),
buildDeltaMetadataConfiguration(handle.getCheckpointInterval()),
Expand Down Expand Up @@ -896,8 +901,52 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
return Optional.empty();
}

private static void appendInitialTableEntries(
@Override
public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata newColumnMetadata)
{
if (newColumnMetadata.getComment() != null) {
Comment thread
findepi marked this conversation as resolved.
Outdated
throw new TrinoException(NOT_SUPPORTED, "This connector does not support adding columns with comments");
}

DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle);

try {
long commitVersion = handle.getReadVersion() + 1;

List<String> partitionColumns = getPartitionedBy(tableMetadata.getProperties());
ImmutableList.Builder<DeltaLakeColumnHandle> columnsBuilder = ImmutableList.builder();
columnsBuilder.addAll(tableMetadata.getColumns().stream()
.filter(column -> !column.isHidden())
.map(column -> toColumnHandle(column, partitionColumns))
.collect(toImmutableList()));
columnsBuilder.add(toColumnHandle(newColumnMetadata, partitionColumns));

Optional<Long> checkpointInterval = DeltaLakeTableProperties.getCheckpointInterval(tableMetadata.getProperties());

TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, handle.getLocation());
appendTableEntries(
commitVersion,
transactionLogWriter,
handle.getMetadataEntry().getId(),
columnsBuilder.build(),
partitionColumns,
buildDeltaMetadataConfiguration(checkpointInterval),
ADD_COLUMN_OPERATION,
session,
nodeVersion,
nodeId);
transactionLogWriter.flush();
}
catch (Exception e) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to add '%s' column for: %s.%s", newColumnMetadata.getName(), handle.getSchemaName(), handle.getTableName()), e);
}
}

private static void appendTableEntries(
long commitVersion,
TransactionLogWriter transactionLogWriter,
String tableId,
List<DeltaLakeColumnHandle> columns,
List<String> partitionColumnNames,
Map<String, String> configuration,
Expand All @@ -909,7 +958,7 @@ private static void appendInitialTableEntries(
long createdTime = System.currentTimeMillis();
transactionLogWriter.appendCommitInfoEntry(
new CommitInfoEntry(
0,
commitVersion,
createdTime,
session.getUser(),
session.getUser(),
Expand All @@ -926,7 +975,7 @@ private static void appendInitialTableEntries(

transactionLogWriter.appendMetadataEntry(
new MetadataEntry(
randomUUID().toString(),
tableId,
null,
null,
new Format("parquet", ImmutableMap.of()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
*/
package io.trino.plugin.deltalake;

import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.Session;
import io.trino.execution.QueryInfo;
import io.trino.plugin.deltalake.util.DockerizedMinioDataLake;
import io.trino.testing.BaseConnectorTest;
Expand All @@ -29,17 +31,23 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.List;
import java.util.Optional;
import java.util.Set;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Sets.union;
import static io.trino.plugin.deltalake.DeltaLakeDockerizedMinioDataLake.createDockerizedMinioDataLakeForDeltaLake;
import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_SCHEMA;
import static io.trino.testing.assertions.Assert.assertEquals;
import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -98,7 +106,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
case SUPPORTS_TOPN_PUSHDOWN:
case SUPPORTS_AGGREGATION_PUSHDOWN:
case SUPPORTS_RENAME_TABLE:
case SUPPORTS_ADD_COLUMN:
case SUPPORTS_ADD_COLUMN_WITH_COMMENT:
case SUPPORTS_DROP_COLUMN:
case SUPPORTS_RENAME_COLUMN:
case SUPPORTS_COMMENT_ON_TABLE:
Expand Down Expand Up @@ -139,6 +147,20 @@ protected void verifyConcurrentInsertFailurePermissible(Exception e)
"|Target file .* was created during locking");
}

@Override
protected void verifyConcurrentAddColumnFailurePermissible(Exception e)
{
assertThat(e)
.hasMessageMatching("Unable to add '.*' column for: .*")
.getCause()
.hasMessageMatching(
"Transaction log locked.*" +
"|.*/_delta_log/\\d+.json already exists" +
"|Conflicting concurrent writes found..*" +
"|Multiple live locks found for:.*" +
"|Target file .* was created during locking");
}

@Override
protected Optional<DataMappingTestSetup> filterCaseSensitiveDataMappingTestData(DataMappingTestSetup dataMappingTestSetup)
{
Expand Down Expand Up @@ -308,14 +330,130 @@ public Object[][] timestampValues()
{"9999-12-31 23:59:59.999 UTC"}};
}

@Test
public void testAddColumnToPartitionedTable()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add tests that will check:

  • if spark can read table with added columns
  • if optimize/vacuum works after adding columns ?

Copy link
Copy Markdown
Member Author

@ebyhr ebyhr May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test about optimize & vacuum.

if spark can read table with added columns

Should I wait #11565?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verifying, even locally if that works would be great imho.
Regarding test - depends how long you would have to wait.
@findinpath what is ETA for #11565 ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#11565 is in a good shape now. I am assuming that the PR will be ready to be merged in the following days.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ebyhr so my advice would be to wait for it and write a proper test

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ebyhr could you check that disabling the cache will actually help with this ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would running REFRESH TABLE in Spark help?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@homar @findepi Disabling the cache didn't help. REFRESH TABLE logs the same error "ERROR DeltaLog: Change in the table id detected..." and subsequent SELECT don't show the error.

JFYI: The error comes from
https://github.com/delta-io/delta/blob/728bf902542077ce1c2e97ca67a53c53bb460c64/core/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala#L574-L575

Copy link
Copy Markdown
Member Author

@ebyhr ebyhr May 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems Spark reuse the same metaData.id when adding a new column.

trino> ALTER TABLE delta.default.test ADD COLUMN c4 int;

s3://presto-ci-test/test/_delta_log/00000000000000000004.json

{"commitInfo":{"version":4,"timestamp":1653357796162,"userId":"yuya.ebihara","userName":"yuya.ebihara","operation":"ALTER TABLE","operationParameters":{"queryId":"20220524_020316_00004_imnpb"},"clusterId":"trino-dev-ffffffff-ffff-ffff-ffff-ffffffffffff","readVersion":0,"isolationLevel":"WriteSerializable","blindAppend":true}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"24b68017-cb79-4f0b-8ee7-e72a496bbaf4","format":{"provider":"parquet","options":{}},"schemaString":"{\"fields\":[{\"metadata\":{},\"name\":\"c1\",\"nullable\":true,\"type\":\"integer\"},{\"metadata\":{},\"name\":\"c2\",\"nullable\":true,\"type\":\"integer\"},{\"metadata\":{},\"name\":\"c3\",\"nullable\":true,\"type\":\"integer\"},{\"metadata\":{},\"name\":\"c4\",\"nullable\":true,\"type\":\"integer\"}],\"type\":\"struct\"}","partitionColumns":[],"configuration":{},"createdTime":1653357796162}}
spark-sql> ALTER TABLE default.test ADD COLUMN (x5 int);

s3://presto-ci-test/test/_delta_log/00000000000000000005.json

{"commitInfo":{"timestamp":1653357938234,"operation":"ADD COLUMNS","operationParameters":{"columns":"[{\"column\":{\"name\":\"x5\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}}]"},"readVersion":4,"isBlindAppend":true,"operationMetrics":{}}}
{"metaData":{"id":"24b68017-cb79-4f0b-8ee7-e72a496bbaf4","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c4\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"x5\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1653357796162}}

Confirmed setting DeltaLakeTableHandle.getMetadataEntry().getId() in MetadataEntry.id suppresses the Spark error message.

{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_add_column_partitioned_table_", "(x VARCHAR, part VARCHAR) WITH (partitioned_by = ARRAY['part'])")) {
assertUpdate("INSERT INTO " + table.getName() + " SELECT 'first', 'part-0001'", 1);
assertQueryFails("ALTER TABLE " + table.getName() + " ADD COLUMN x bigint", ".* Column 'x' already exists");
assertQueryFails("ALTER TABLE " + table.getName() + " ADD COLUMN part bigint", ".* Column 'part' already exists");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN a varchar(50)");
assertUpdate("INSERT INTO " + table.getName() + " SELECT 'second', 'part-0002', 'xxx'", 1);
assertQuery(
"SELECT x, part, a FROM " + table.getName(),
"VALUES ('first', 'part-0001', NULL), ('second', 'part-0002', 'xxx')");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN b double");
assertUpdate("INSERT INTO " + table.getName() + " SELECT 'third', 'part-0003', 'yyy', 33.3E0", 1);
assertQuery(
"SELECT x, part, a, b FROM " + table.getName(),
"VALUES ('first', 'part-0001', NULL, NULL), ('second', 'part-0002', 'xxx', NULL), ('third', 'part-0003', 'yyy', 33.3)");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN IF NOT EXISTS c varchar(50)");
assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN IF NOT EXISTS part varchar(50)");
assertUpdate("INSERT INTO " + table.getName() + " SELECT 'fourth', 'part-0004', 'zzz', 55.3E0, 'newColumn'", 1);
assertQuery(
"SELECT x, part, a, b, c FROM " + table.getName(),
"VALUES ('first', 'part-0001', NULL, NULL, NULL), ('second', 'part-0002', 'xxx', NULL, NULL), ('third', 'part-0003', 'yyy', 33.3, NULL), ('fourth', 'part-0004', 'zzz', 55.3, 'newColumn')");
}
}

private QueryInfo getQueryInfo(DistributedQueryRunner queryRunner, ResultWithQueryId<MaterializedResult> queryResult)
{
return queryRunner.getCoordinator().getQueryManager().getFullQueryInfo(queryResult.getQueryId());
}

@Test
public void testAddColumnAndOptimize()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_add_column_and_optimize", "(x VARCHAR)")) {
assertUpdate("INSERT INTO " + table.getName() + " SELECT 'first'", 1);

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN a varchar(50)");
assertUpdate("INSERT INTO " + table.getName() + " SELECT 'second', 'xxx'", 1);
assertQuery(
"SELECT x, a FROM " + table.getName(),
"VALUES ('first', NULL), ('second', 'xxx')");

Set<String> beforeActiveFiles = getActiveFiles(table.getName());
computeActual("ALTER TABLE " + table.getName() + " EXECUTE OPTIMIZE");

// Verify OPTIMIZE happened, but table data didn't change
assertThat(beforeActiveFiles).isNotEqualTo(getActiveFiles(table.getName()));
assertQuery(
"SELECT x, a FROM " + table.getName(),
"VALUES ('first', NULL), ('second', 'xxx')");
}
}

@Test
public void testAddColumnAndVacuum()
throws Exception
{
Session sessionWithShortRetentionUnlocked = Session.builder(getSession())
.setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "vacuum_min_retention", "0s")
.build();

try (TestTable table = new TestTable(getQueryRunner()::execute, "test_add_column_and_optimize", "(x VARCHAR)")) {
assertUpdate("INSERT INTO " + table.getName() + " SELECT 'first'", 1);
assertUpdate("INSERT INTO " + table.getName() + " SELECT 'second'", 1);

Set<String> initialFiles = getActiveFiles(table.getName());
assertThat(initialFiles).hasSize(2);

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN a varchar(50)");

assertUpdate("UPDATE " + table.getName() + " SET a = 'new column'", 2);
Stopwatch timeSinceUpdate = Stopwatch.createStarted();
Set<String> updatedFiles = getActiveFiles(table.getName());
assertThat(updatedFiles).hasSize(2).doesNotContainAnyElementsOf(initialFiles);
assertThat(getAllDataFilesFromTableDirectory(table.getName())).isEqualTo(union(initialFiles, updatedFiles));

assertQuery(
"SELECT x, a FROM " + table.getName(),
"VALUES ('first', 'new column'), ('second', 'new column')");

MILLISECONDS.sleep(1_000 - timeSinceUpdate.elapsed(MILLISECONDS) + 1);
assertUpdate(sessionWithShortRetentionUnlocked, "CALL system.vacuum(schema_name => CURRENT_SCHEMA, table_name => '" + table.getName() + "', retention => '1s')");

// Verify VACUUM happened, but table data didn't change
assertThat(getAllDataFilesFromTableDirectory(table.getName())).isEqualTo(updatedFiles);
assertQuery(
"SELECT x, a FROM " + table.getName(),
"VALUES ('first', 'new column'), ('second', 'new column')");
}
}

@Override
protected String createSchemaSql(String schemaName)
{
return "CREATE SCHEMA " + schemaName + " WITH (location = 's3://" + bucketName + "/" + schemaName + "')";
}

private Set<String> getActiveFiles(String tableName)
{
return getActiveFiles(tableName, getQueryRunner().getDefaultSession());
}

private Set<String> getActiveFiles(String tableName, Session session)
{
return computeActual(session, "SELECT DISTINCT \"$path\" FROM " + tableName).getOnlyColumnAsSet().stream()
.map(String.class::cast)
.collect(toImmutableSet());
}

private Set<String> getAllDataFilesFromTableDirectory(String tableName)
{
return getTableFiles(tableName).stream()
.filter(path -> !path.contains("/" + TRANSACTION_LOG_DIRECTORY))
.collect(toImmutableSet());
}

private List<String> getTableFiles(String tableName)
{
return dockerizedMinioDataLake.listFiles(format("%s/%s", SCHEMA, tableName)).stream()
.map(path -> format("s3://%s/%s", bucketName, path))
.collect(toImmutableList());
}
}