Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.facebook.presto.sql.tree.Revoke;
import com.facebook.presto.sql.tree.RevokeRoles;
import com.facebook.presto.sql.tree.Rollback;
import com.facebook.presto.sql.tree.SetProperties;
import com.facebook.presto.sql.tree.SetRole;
import com.facebook.presto.sql.tree.SetSession;
import com.facebook.presto.sql.tree.ShowCatalogs;
Expand Down Expand Up @@ -136,6 +137,7 @@ private StatementUtils() {}
builder.put(DropFunction.class, QueryType.CONTROL);
builder.put(Use.class, QueryType.CONTROL);
builder.put(SetSession.class, QueryType.CONTROL);
builder.put(SetProperties.class, QueryType.DATA_DEFINITION);
builder.put(ResetSession.class, QueryType.CONTROL);
builder.put(StartTransaction.class, QueryType.CONTROL);
builder.put(Commit.class, QueryType.CONTROL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.statistics.TableStatistics;

Expand All @@ -28,9 +29,12 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;

public interface JdbcClient
{
default boolean schemaExists(ConnectorSession session, JdbcIdentity identity, String schema)
Expand Down Expand Up @@ -73,6 +77,11 @@ PreparedStatement buildSql(ConnectorSession session, Connection connection, Jdbc

void renameTable(ConnectorSession session, JdbcIdentity identity, JdbcTableHandle handle, SchemaTableName newTableName);

default void setTableProperties(ConnectorSession session, JdbcTableHandle tableHandle, Map<String, Object> properties)
{
throw new PrestoException(NOT_SUPPORTED, "This connector does not support setting table properties");
}

void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata);

JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static com.facebook.presto.plugin.clickhouse.ClickHouseEngineType.MERGETREE;
import static com.facebook.presto.plugin.clickhouse.ClickHouseErrorCode.JDBC_ERROR;
import static com.facebook.presto.plugin.clickhouse.ClickhouseDXLKeyWords.ORDER_BY_PROPERTY;
import static com.facebook.presto.plugin.clickhouse.ClickhouseDXLKeyWords.SAMPLE_BY_PROPERTY;
import static com.facebook.presto.plugin.clickhouse.StandardReadMappings.jdbcTypeToPrestoType;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
Expand Down Expand Up @@ -781,6 +782,24 @@ protected void renameTable(ClickHouseIdentity identity, String catalogName, Sche
}
}

public void setTableProperties(ClickHouseIdentity identity, ClickHouseTableHandle handle, Map<String, Object> properties)
{
checkArgument(properties.size() == 1 && properties.containsKey(SAMPLE_BY_PROPERTY), "Only support setting `sample_by` property");
ImmutableList.Builder<String> tableOptions = ImmutableList.builder();
ClickHouseTableProperties.getSampleBy(properties).ifPresent(value -> tableOptions.add("SAMPLE BY " + value));

try (Connection connection = connectionFactory.openConnection(identity)) {
String sql = format(
"ALTER TABLE %s MODIFY %s",
quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTableName()),
join(" ", tableOptions.build()));
execute(connection, sql);
}
catch (SQLException e) {
throw new PrestoException(JDBC_ERROR, e);
}
}

private String getColumnDefinitionSql(ColumnMetadata column, String columnName)
{
StringBuilder builder = new StringBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ public void renameTable(ConnectorSession session, ConnectorTableHandle table, Sc
clickHouseClient.renameTable(ClickHouseIdentity.from(session), tableHandle, newTableName);
}

public void setTableProperties(ConnectorSession session, ConnectorTableHandle table, Map<String, Object> properties)
{
ClickHouseTableHandle tableHandle = (ClickHouseTableHandle) table;
clickHouseClient.setTableProperties(ClickHouseIdentity.from(session), tableHandle, properties);
}

@Override
public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle, Optional<ConnectorTableLayoutHandle> tableLayoutHandle, List<ColumnHandle> columnHandles, Constraint<ColumnHandle> constraint)
{
Expand Down
14 changes: 12 additions & 2 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ Property Name Description
``format_version`` Optionally specifies the format version of the Iceberg
specification to use for new tables, either ``1`` or ``2``.
Defaults to ``1``.

``commit_retries`` Determines the number of attempts for committing the metadata
in case of concurrent upsert requests, before failing. The
default value is 4.
========================================= ===============================================================

The table definition below specifies format ``ORC``, partitioning by columns ``c1`` and ``c2``,
Expand Down Expand Up @@ -497,7 +501,7 @@ that is stored using the ORC file format, partitioned by ``ds`` and
partitioning = ARRAY['ds', 'country']
)

Create an Iceberg table with Iceberg format version 2::
Create an Iceberg table with Iceberg format version 2 and with commit_retries set to 5::

CREATE TABLE iceberg.web.page_views_v2 (
view_time timestamp,
Expand All @@ -509,7 +513,8 @@ Create an Iceberg table with Iceberg format version 2::
WITH (
format = 'ORC',
partitioning = ARRAY['ds', 'country'],
format_version = '2'
format_version = '2',
commit_retries = 5
)

Partition Column Transform
Expand Down Expand Up @@ -644,6 +649,11 @@ The table is partitioned by the transformed value of the column::

ALTER TABLE iceberg.web.page_views ADD COLUMN ts timestamp WITH (partitioning = 'hour');

Table properties can be modified for an iceberg table using ALTER TABLE SET PROPERTIES statement. Only `commit_retries` can be modified at present.
Comment thread
pratyakshsharma marked this conversation as resolved.
For example, to set commit_retries to 6 for table `iceberg.web.page_views_v2`, use the below statement::
Comment thread
pratyakshsharma marked this conversation as resolved.

ALTER TABLE iceberg.web.page_views_v2 SET PROPERTIES (commit_retries = 6);

TRUNCATE
^^^^^^^^

Expand Down
4 changes: 4 additions & 0 deletions presto-docs/src/main/sphinx/sql/alter-table.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ Rename column ``id`` to ``user_id`` in the ``users`` table if table ``users`` an

ALTER TABLE IF EXISTS users RENAME column IF EXISTS id to user_id;

Set table property (``x=y``) to table ``users``::

ALTER TABLE users SET PROPERTIES (x='y');

See Also
--------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import javax.inject.Inject;

import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -95,6 +96,11 @@ public void checkCanCreateTable(ConnectorTransactionHandle transaction, Connecto
{
}

@Override
public void checkCanSetTableProperties(ConnectorTransactionHandle transactionHandle, ConnectorIdentity identity, AccessControlContext context, SchemaTableName tableName, Map<String, Object> properties)
{
}

@Override
public void checkCanDropTable(ConnectorTransactionHandle transaction, ConnectorIdentity identity, AccessControlContext context, SchemaTableName tableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import javax.inject.Inject;

import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -72,6 +73,7 @@
import static com.facebook.presto.spi.security.AccessDeniedException.denySelectTable;
import static com.facebook.presto.spi.security.AccessDeniedException.denySetCatalogSessionProperty;
import static com.facebook.presto.spi.security.AccessDeniedException.denySetRole;
import static com.facebook.presto.spi.security.AccessDeniedException.denySetTableProperties;
import static com.facebook.presto.spi.security.AccessDeniedException.denyShowRoles;
import static com.facebook.presto.spi.security.AccessDeniedException.denyTruncateTable;
import static com.facebook.presto.spi.security.AccessDeniedException.denyUpdateTableColumns;
Expand Down Expand Up @@ -148,6 +150,15 @@ public void checkCanCreateTable(ConnectorTransactionHandle transaction, Connecto
}
}

@Override
public void checkCanSetTableProperties(ConnectorTransactionHandle transactionHandle, ConnectorIdentity identity, AccessControlContext context, SchemaTableName tableName, Map<String, Object> properties)
{
MetastoreContext metastoreContext = new MetastoreContext(identity, context.getQueryId().getId(), context.getClientInfo(), context.getSource(), Optional.empty(), false, HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER);
if (!isTableOwner(transactionHandle, identity, metastoreContext, tableName)) {
denySetTableProperties(tableName.toString());
}
}

@Override
public void checkCanDropTable(ConnectorTransactionHandle transaction, ConnectorIdentity identity, AccessControlContext context, SchemaTableName tableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -105,6 +107,7 @@
import static com.facebook.presto.iceberg.IcebergColumnHandle.primitiveIcebergColumnHandle;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled;
import static com.facebook.presto.iceberg.IcebergTableProperties.CONCURRENCY_RETRIES;
import static com.facebook.presto.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableProperties.FORMAT_VERSION;
import static com.facebook.presto.iceberg.IcebergTableProperties.LOCATION_PROPERTY;
Expand Down Expand Up @@ -791,6 +794,28 @@ public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandl
}
}

@Override
public void setTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle, Map<String, Object> properties)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
transaction = icebergTable.newTransaction();

UpdateProperties updateProperties = transaction.updateProperties();
for (Map.Entry<String, Object> entry : properties.entrySet()) {
switch (entry.getKey()) {
case CONCURRENCY_RETRIES:
updateProperties.set(TableProperties.COMMIT_NUM_RETRIES, String.valueOf(entry.getValue()));
break;
default:
throw new PrestoException(NOT_SUPPORTED, "Updating property " + entry.getKey() + " is not supported currently");
}
}

updateProperties.commit();
transaction.commitTransaction();
}

/**
* Deletes all the files within a particular scan
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.spi.session.PropertyMetadata;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.TableProperties;

import javax.inject.Inject;

Expand All @@ -26,6 +27,7 @@

import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Locale.ENGLISH;
Expand All @@ -36,6 +38,7 @@ public class IcebergTableProperties
public static final String PARTITIONING_PROPERTY = "partitioning";
public static final String LOCATION_PROPERTY = "location";
public static final String FORMAT_VERSION = "format_version";
public static final String CONCURRENCY_RETRIES = "commit_retries";

private final List<PropertyMetadata<?>> tableProperties;
private final List<PropertyMetadata<?>> columnProperties;
Expand Down Expand Up @@ -74,6 +77,11 @@ public IcebergTableProperties(IcebergConfig icebergConfig)
"Format version for the table",
null,
false))
.add(integerProperty(
CONCURRENCY_RETRIES,
"Determines the number of attempts in case of concurrent upserts",
TableProperties.COMMIT_NUM_RETRIES_DEFAULT,
false))
.build();

columnProperties = ImmutableList.of(stringProperty(
Expand Down Expand Up @@ -114,4 +122,9 @@ public static String getFormatVersion(Map<String, Object> tableProperties)
{
return (String) tableProperties.get(FORMAT_VERSION);
}

public static Integer getConcurrencyRetries(Map<String, Object> tableProperties)
{
return (Integer) tableProperties.get(CONCURRENCY_RETRIES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.metadata.CatalogManager;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.testing.assertions.Assert;
Expand Down Expand Up @@ -45,6 +46,7 @@
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -1121,4 +1123,38 @@ public void testTimestampPartitionedByHour()

dropTable(session, tableName);
}

@Test
public void testUpdatingInvalidProperty()
{
Session session = getSession();
String tableName = "test_invalid_property_update";
assertUpdate(session, "CREATE TABLE " + tableName + " (c1 integer, c2 varchar) WITH(commit_retries = 4)");
assertThatThrownBy(() -> assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES (format = 'PARQUET')"))
.hasMessage("Updating property format is not supported currently");
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testUpdatingRandomProperty()
{
Session session = getSession();
String tableName = "test_random_property_update";
assertUpdate(session, "CREATE TABLE " + tableName + " (c1 integer, c2 varchar) WITH(commit_retries = 4)");
assertThatThrownBy(() -> assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES (some_config = 2)"))
.hasMessage("Catalog 'iceberg' does not support table property 'some_config'");
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testUpdatingCommitRetries()
{
Session session = getSession();
String tableName = "test_commit_retries_update";
assertUpdate(session, "CREATE TABLE " + tableName + " (c1 integer, c2 varchar) WITH(commit_retries = 4)");
assertQuery("SELECT value FROM \"" + tableName + "$properties\" WHERE key = 'commit.retry.num-retries'", "VALUES 4");
assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES (commit_retries = 5)");
assertQuery("SELECT value FROM \"" + tableName + "$properties\" WHERE key = 'commit.retry.num-retries'", "VALUES 5");
assertUpdate("DROP TABLE " + tableName);
}
}
Loading