Skip to content

Commit

Permalink
Introduce operation CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION (#27)
Browse files Browse the repository at this point in the history
This PR introduces a new operation, CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION, and requires it for createTableDirectWithWriteDelegation.

---------

Co-authored-by: Eric Maynard <[email protected]>
  • Loading branch information
eric-maynard and sfc-gh-emaynard authored Jul 31, 2024
1 parent 627dc60 commit 47d29e7
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public enum PolarisAuthorizableOperation {
UPDATE_NAMESPACE_PROPERTIES(NAMESPACE_WRITE_PROPERTIES),
LIST_TABLES(TABLE_LIST),
CREATE_TABLE_DIRECT(TABLE_CREATE),
CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION(EnumSet.of(TABLE_CREATE, TABLE_WRITE_DATA)),
CREATE_TABLE_STAGED(TABLE_CREATE),
CREATE_TABLE_STAGED_WITH_WRITE_DELEGATION(EnumSet.of(TABLE_CREATE, TABLE_WRITE_DATA)),
REGISTER_TABLE(TABLE_CREATE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,8 @@ public LoadTableResponse createTableDirect(Namespace namespace, CreateTableReque

public LoadTableResponse createTableDirectWithWriteDelegation(
Namespace namespace, CreateTableRequest request) {
PolarisAuthorizableOperation op = PolarisAuthorizableOperation.CREATE_TABLE_DIRECT;
PolarisAuthorizableOperation op =
PolarisAuthorizableOperation.CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION;
authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
op, TableIdentifier.of(namespace, request.name()));

Expand Down Expand Up @@ -591,20 +592,18 @@ public LoadTableResponse createTableDirectWithWriteDelegation(
LoadTableResponse.Builder responseBuilder =
LoadTableResponse.builder().withTableMetadata(tableMetadata);
if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) {
try {
Set<PolarisStorageActions> actionsRequested =
getValidTableActionsOrThrow(tableIdentifier);

LOG.atDebug()
.addKeyValue("tableIdentifier", tableIdentifier)
.addKeyValue("tableLocation", tableMetadata.location())
.log("Fetching client credentials for table");
responseBuilder.addAllConfig(
credentialDelegation.getCredentialConfig(
tableIdentifier, tableMetadata, actionsRequested));
} catch (ForbiddenException | NoSuchTableException e) {
// No privileges available
}
LOG.atDebug()
.addKeyValue("tableIdentifier", tableIdentifier)
.addKeyValue("tableLocation", tableMetadata.location())
.log("Fetching client credentials for table");
responseBuilder.addAllConfig(
credentialDelegation.getCredentialConfig(
tableIdentifier,
tableMetadata,
Set.of(
PolarisStorageActions.READ,
PolarisStorageActions.WRITE,
PolarisStorageActions.LIST)));
}
return responseBuilder.build();
} else if (table instanceof BaseMetadataTable) {
Expand Down Expand Up @@ -706,18 +705,13 @@ public LoadTableResponse createTableStagedWithWriteDelegation(
LoadTableResponse.builder().withTableMetadata(metadata);

if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) {
try {
Set<PolarisStorageActions> actionsRequested = getValidTableActionsOrThrow(ident);

LOG.atDebug()
.addKeyValue("tableIdentifier", ident)
.addKeyValue("tableLocation", metadata.location())
.log("Fetching client credentials for table");
responseBuilder.addAllConfig(
credentialDelegation.getCredentialConfig(ident, metadata, actionsRequested));
} catch (ForbiddenException | NoSuchTableException e) {
// No privileges available
}
LOG.atDebug()
.addKeyValue("tableIdentifier", ident)
.addKeyValue("tableLocation", metadata.location())
.log("Fetching client credentials for table");
responseBuilder.addAllConfig(
credentialDelegation.getCredentialConfig(
ident, metadata, Set.of(PolarisStorageActions.ALL)));
}
return responseBuilder.build();
});
Expand Down Expand Up @@ -779,39 +773,31 @@ public LoadTableResponse loadTable(TableIdentifier tableIdentifier, String snaps
return doCatalogOperation(() -> CatalogHandlers.loadTable(baseCatalog, tableIdentifier));
}

private Set<PolarisStorageActions> getValidTableActionsOrThrow(TableIdentifier tableIdentifier) {
public LoadTableResponse loadTableWithAccessDelegation(
TableIdentifier tableIdentifier, String xIcebergAccessDelegation, String snapshots) {
// Here we have a single method that falls through multiple candidate
// PolarisAuthorizableOperations because instead of identifying the desired operation up-front
// and
// failing the authz check if grants aren't found, we find the first most-privileged authz match
// and respond according to that.
PolarisAuthorizableOperation read =
PolarisAuthorizableOperation.LOAD_TABLE_WITH_READ_DELEGATION;
PolarisAuthorizableOperation write =
PolarisAuthorizableOperation.LOAD_TABLE_WITH_WRITE_DELEGATION;

Set<PolarisStorageActions> actionsRequested =
new HashSet<>(Set.of(PolarisStorageActions.READ, PolarisStorageActions.LIST));
try {
// TODO: Refactor to have a boolean-return version of the helpers so we can fallthrough
// easily.
authorizeBasicTableLikeOperationOrThrow(write, PolarisEntitySubType.TABLE, tableIdentifier);
actionsRequested.add(PolarisStorageActions.WRITE);
} catch (ForbiddenException | NoSuchTableException e) {
LOG.atDebug()
.addKeyValue("tableIdentifier", tableIdentifier)
.log("Authz failed for LOAD_TABLE_WITH_WRITE_DELEGATION so attempting READ only");
} catch (ForbiddenException e) {
authorizeBasicTableLikeOperationOrThrow(read, PolarisEntitySubType.TABLE, tableIdentifier);
}
return actionsRequested;
}

public LoadTableResponse loadTableWithAccessDelegation(
TableIdentifier tableIdentifier, String xIcebergAccessDelegation, String snapshots) {
// Here we have a single method that falls through multiple candidate
// PolarisAuthorizableOperations because instead of identifying the desired operation up-front
// and
// failing the authz check if grants aren't found, we find the first most-privileged authz match
// and respond according to that.

// TODO: Find a way for the configuration or caller to better express whether to fail or omit
// when data-access is specified but access delegation grants are not found.
Set<PolarisStorageActions> actionsRequested = getValidTableActionsOrThrow(tableIdentifier);

return doCatalogOperation(
() -> {
Table table = baseCatalog.loadTable(tableIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,61 @@ public void testCreateTableDirectInsufficientPermissions() {
});
}

@Test
public void testCreateTableDirectWithWriteDelegationAllSufficientPrivileges() {
Assertions.assertThat(
adminService.grantPrivilegeOnCatalogToRole(
CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.TABLE_DROP))
.isTrue();
Assertions.assertThat(
adminService.grantPrivilegeOnCatalogToRole(
CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.TABLE_WRITE_DATA))
.isTrue();

final TableIdentifier newtable = TableIdentifier.of(NS2, "newtable");
final CreateTableRequest createDirectWithWriteDelegationRequest =
CreateTableRequest.builder().withName("newtable").withSchema(SCHEMA).stageCreate().build();

doTestSufficientPrivilegeSets(
List.of(
Set.of(PolarisPrivilege.TABLE_CREATE, PolarisPrivilege.TABLE_WRITE_DATA),
Set.of(PolarisPrivilege.CATALOG_MANAGE_CONTENT)),
() -> {
newWrapper(Set.of(PRINCIPAL_ROLE1))
.createTableDirectWithWriteDelegation(NS2, createDirectWithWriteDelegationRequest);
},
() -> {
newWrapper(Set.of(PRINCIPAL_ROLE2)).dropTableWithPurge(newtable);
},
PRINCIPAL_NAME);
}

@Test
public void testCreateTableDirectWithWriteDelegationInsufficientPermissions() {
final CreateTableRequest createDirectWithWriteDelegationRequest =
CreateTableRequest.builder()
.withName("directtable")
.withSchema(SCHEMA)
.stageCreate()
.build();

doTestInsufficientPrivileges(
List.of(
PolarisPrivilege.NAMESPACE_FULL_METADATA,
PolarisPrivilege.VIEW_FULL_METADATA,
PolarisPrivilege.TABLE_DROP,
PolarisPrivilege.TABLE_CREATE, // TABLE_CREATE itself is insufficient for delegation
PolarisPrivilege.TABLE_READ_PROPERTIES,
PolarisPrivilege.TABLE_WRITE_PROPERTIES,
PolarisPrivilege.TABLE_READ_DATA,
PolarisPrivilege.TABLE_WRITE_DATA,
PolarisPrivilege.TABLE_LIST),
() -> {
newWrapper(Set.of(PRINCIPAL_ROLE1))
.createTableDirectWithWriteDelegation(NS2, createDirectWithWriteDelegationRequest);
});
}

@Test
public void testCreateTableStagedAllSufficientPrivileges() {
Assertions.assertThat(
Expand Down
65 changes: 44 additions & 21 deletions regtests/t_pyspark/src/test_spark_sql_s3_with_privileges.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from polaris.management import PolarisDefaultApi, Principal, PrincipalRole, CatalogRole, \
CatalogGrant, CatalogPrivilege, ApiException, CreateCatalogRoleRequest, CreatePrincipalRoleRequest, \
CreatePrincipalRequest, AddGrantRequest, GrantCatalogRoleRequest, GrantPrincipalRoleRequest
from polaris.management.exceptions import ForbiddenException


@pytest.fixture
Expand Down Expand Up @@ -742,8 +743,7 @@ def test_spark_credentials_s3_direct_without_write(root_client, snowflake_catalo
def test_spark_credentials_s3_direct_without_read(
snowflake_catalog, snowman_catalog_client, creator_catalog_client, test_bucket):
"""
Create a table using `creator`, which does not have TABLE_READ_DATA and ensure that credentials to read the table
are not vended.
Create a table using `creator`, which does not have TABLE_READ_DATA and expect a `ForbiddenException`
"""
snowman_catalog_client.create_namespace(
prefix=snowflake_catalog.name,
Expand All @@ -752,26 +752,23 @@ def test_spark_credentials_s3_direct_without_read(
)
)

response = creator_catalog_client.create_table(
prefix=snowflake_catalog.name,
namespace="some_schema",
x_iceberg_access_delegation="true",
create_table_request=CreateTableRequest(
name="some_table",
var_schema=ModelSchema(
type = 'struct',
fields = [],
)
)
)

assert not response.config
try:
creator_catalog_client.create_table(
prefix=snowflake_catalog.name,
namespace="some_schema",
x_iceberg_access_delegation="true",
create_table_request=CreateTableRequest(
name="some_table",
var_schema=ModelSchema(
type = 'struct',
fields = [],
)
)
)
pytest.fail("Expected exception when creating a table without TABLE_WRITE")
except Exception as e:
assert 'CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION' in str(e)

snowman_catalog_client.drop_table(
prefix=snowflake_catalog.name,
namespace="some_schema",
table="some_table"
)
snowman_catalog_client.drop_namespace(
prefix=snowflake_catalog.name,
namespace="some_schema"
Expand Down Expand Up @@ -883,6 +880,32 @@ def test_spark_credentials_s3_scoped_to_metadata_data_locations(root_client, sno
spark.sql('DROP NAMESPACE db1.schema')
spark.sql('DROP NAMESPACE db1')

@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true', reason='AWS_TEST_ENABLED is not set or is false')
def test_spark_ctas(snowflake_catalog, polaris_catalog_url, snowman):
"""
Create a table using CTAS and ensure that credentials are vended
:param root_client:
:param snowflake_catalog:
:return:
"""
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
table_name = f'iceberg_test_table_{str(uuid.uuid4())[-10:]}'
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('CREATE NAMESPACE db1')
spark.sql('CREATE NAMESPACE db1.schema')
spark.sql('USE db1.schema')
spark.sql(f'CREATE TABLE {table_name}_t1 (col1 int)')
spark.sql('SHOW TABLES')

# Insert some data
spark.sql(f"INSERT INTO {table_name}_t1 VALUES (10)")

# Run CTAS
spark.sql(f"CREATE TABLE {table_name}_t2 AS SELECT * FROM {table_name}_t1")


def create_catalog_role(api, catalog, role_name):
catalog_role = CatalogRole(name=role_name)
try:
Expand Down

0 comments on commit 47d29e7

Please sign in to comment.