feat(plugin-iceberg): Add DDL statements for CREATE BRANCH#26898
feat(plugin-iceberg): Add DDL statements for CREATE BRANCH#26898tdcmeehan merged 2 commits intoprestodb:masterfrom
Conversation
Reviewer's GuideImplements full SQL, execution, metadata, and security plumbing for an ALTER TABLE ... CREATE BRANCH statement and wires it to Iceberg’s snapshot/branch APIs, including parser/AST support, metadata and access control hooks, and connector-level tests. Sequence diagram for ALTER TABLE ... CREATE BRANCH executionsequenceDiagram
actor User
participant PrestoCoordinator as PrestoCoordinator
participant Parser as SqlParser
participant Analyzer as StatementAnalyzer
participant Binder as PrestoDataDefBindingHelper
participant Task as CreateBranchTask
participant AC as AccessControlManager
participant Meta as MetadataManager
participant ConnMeta as IcebergAbstractMetadata
participant IcebergTable as IcebergTable
User->>PrestoCoordinator: ALTER TABLE iceberg.default.mytable CREATE BRANCH 'audit-branch' ...
PrestoCoordinator->>Parser: parse(sql)
Parser-->>PrestoCoordinator: CreateBranch AST
PrestoCoordinator->>Analyzer: analyze(CreateBranch)
Analyzer-->>PrestoCoordinator: analyzed statement
PrestoCoordinator->>Binder: resolve DDL task for CreateBranch
Binder-->>PrestoCoordinator: CreateBranchTask
PrestoCoordinator->>Task: execute(CreateBranch, session, metadata, accessControl, ...)
Task->>Meta: getMetadataResolver(session).getTableHandle(tableName)
Meta-->>Task: Optional<TableHandle>
Task->>Meta: getMetadataResolver(session).getMaterializedView(tableName)
Meta-->>Task: Optional<MaterializedViewDefinition>
Note right of Task: If materialized view present, throw NOT_SUPPORTED
Task->>AC: checkCanCreateBranch(transactionId, identity, context, tableName)
AC->>AC: authenticationCheck / authorizationCheck
AC->>AC: systemAccessControl.checkCanCreateBranch(...)
AC->>AC: connectorAccessControl.checkCanCreateBranch(...)
AC-->>Task: success or AccessDeniedException
alt AS OF TIMESTAMP specified
Task->>Task: analyze timestamp expression
Task->>Task: evaluateConstantExpression
Task->>Task: convert to asOfTimestampMillis
end
Task->>Meta: createBranch(session, tableHandle, branchName, snapshotId, asOfTimestampMillis, retainDays, minSnapshotsToKeep, maxSnapshotAgeDays)
Meta->>ConnMeta: createBranch(connectorSession, connectorTableHandle, ...)
ConnMeta->>IcebergTable: getIcebergTable(schemaTableName)
IcebergTable-->>ConnMeta: Table
ConnMeta->>ConnMeta: validate table type is DATA
ConnMeta->>ConnMeta: check branch does not already exist
ConnMeta->>IcebergTable: manageSnapshots()
IcebergTable-->>ConnMeta: ManageSnapshots
alt snapshotId present
ConnMeta->>IcebergTable: createBranch(branchName, snapshotId)
else asOfTimestampMillis present
ConnMeta->>IcebergTable: iterate snapshots() to find snapshot <= asOfTimestampMillis
ConnMeta->>IcebergTable: createBranch(branchName, targetSnapshotId)
else no snapshot hints
ConnMeta->>IcebergTable: currentSnapshot().snapshotId()
ConnMeta->>IcebergTable: createBranch(branchName, currentSnapshotId)
end
opt retention options
ConnMeta->>IcebergTable: setMaxRefAgeMs(branchName, retainMs)
ConnMeta->>IcebergTable: setMinSnapshotsToKeep(branchName, minSnapshotsToKeep)
ConnMeta->>IcebergTable: setMaxSnapshotAgeMs(branchName, maxSnapshotAgeMs)
end
ConnMeta->>IcebergTable: commit()
IcebergTable-->>ConnMeta: branch created
ConnMeta-->>Meta: success
Meta-->>Task: success
Task-->>PrestoCoordinator: immediateFuture(null)
PrestoCoordinator-->>User: command completed
Class diagram for CreateBranch AST statementclassDiagram
class Statement {
}
class Node {
}
class QualifiedName {
}
class Expression {
}
class CreateBranch {
- QualifiedName tableName
- String branchName
- Optional~Long~ snapshotId
- Optional~Expression~ asOfTimestamp
- Optional~Long~ retainDays
- Optional~Integer~ minSnapshotsToKeep
- Optional~Long~ maxSnapshotAgeDays
+ CreateBranch(QualifiedName tableName, String branchName, Optional~Long~ snapshotId, Optional~Expression~ asOfTimestamp, Optional~Long~ retainDays, Optional~Integer~ minSnapshotsToKeep, Optional~Long~ maxSnapshotAgeDays)
+ CreateBranch(NodeLocation location, QualifiedName tableName, String branchName, Optional~Long~ snapshotId, Optional~Expression~ asOfTimestamp, Optional~Long~ retainDays, Optional~Integer~ minSnapshotsToKeep, Optional~Long~ maxSnapshotAgeDays)
+ getTableName() QualifiedName
+ getBranchName() String
+ getSnapshotId() Optional~Long~
+ getAsOfTimestamp() Optional~Expression~
+ getRetainDays() Optional~Long~
+ getMinSnapshotsToKeep() Optional~Integer~
+ getMaxSnapshotAgeDays() Optional~Long~
+ accept(AstVisitor visitor, Object context) Object
+ getChildren() List~Node~
+ equals(Object o) boolean
+ hashCode() int
+ toString() String
}
class AstVisitor {
+ visitCreateBranch(CreateBranch node, Object context) Object
}
Statement <|-- CreateBranch
Node <|-- Statement
Expression <|-- Node
QualifiedName <|-- Node
AstVisitor <.. CreateBranch : accept
Class diagram for CreateBranchTask and metadata/access control integrationclassDiagram
class DDLDefinitionTask~T~ {
<<interface>>
+ getName() String
+ execute(T statement, TransactionManager transactionManager, Metadata metadata, AccessControl accessControl, Session session, List~Expression~ parameters, WarningCollector warningCollector, String query) ListenableFuture
}
class CreateBranchTask {
+ getName() String
+ execute(CreateBranch statement, TransactionManager transactionManager, Metadata metadata, AccessControl accessControl, Session session, List~Expression~ parameters, WarningCollector warningCollector, String query) ListenableFuture
}
class Metadata {
+ getMetadataResolver(Session session) MetadataResolver
+ createBranch(Session session, TableHandle tableHandle, String branchName, Optional~Long~ snapshotId, Optional~Long~ asOfTimestampMillis, Optional~Long~ retainDays, Optional~Integer~ minSnapshotsToKeep, Optional~Long~ maxSnapshotAgeDays) void
}
class MetadataResolver {
+ getTableHandle(QualifiedObjectName tableName) Optional~TableHandle~
+ getMaterializedView(QualifiedObjectName tableName) Optional~MaterializedViewDefinition~
}
class AccessControl {
+ checkCanCreateBranch(TransactionId transactionId, Identity identity, AccessControlContext context, QualifiedObjectName tableName) void
}
class AccessControlManager {
+ checkCanCreateBranch(TransactionId transactionId, Identity identity, AccessControlContext context, QualifiedObjectName tableName) void
}
class SystemAccessControl {
+ checkCanCreateBranch(Identity identity, AccessControlContext context, CatalogSchemaTableName table) void
}
class ConnectorAccessControl {
+ checkCanCreateBranch(ConnectorTransactionHandle transactionHandle, ConnectorIdentity identity, AccessControlContext context, SchemaTableName tableName) void
}
class IcebergAbstractMetadata {
+ createBranch(ConnectorSession session, ConnectorTableHandle tableHandle, String branchName, Optional~Long~ snapshotId, Optional~Long~ asOfTimestampMillis, Optional~Long~ retainDays, Optional~Integer~ minSnapshotsToKeep, Optional~Long~ maxSnapshotAgeDays) void
}
class ConnectorMetadata {
+ createBranch(ConnectorSession session, ConnectorTableHandle tableHandle, String branchName, Optional~Long~ snapshotId, Optional~Long~ asOfTimestampMillis, Optional~Long~ retainDays, Optional~Integer~ minSnapshotsToKeep, Optional~Long~ maxSnapshotAgeDays) void
}
class MetadataManager {
+ createBranch(Session session, TableHandle tableHandle, String branchName, Optional~Long~ snapshotId, Optional~Long~ asOfTimestampMillis, Optional~Long~ retainDays, Optional~Integer~ minSnapshotsToKeep, Optional~Long~ maxSnapshotAgeDays) void
}
class ClassLoaderSafeConnectorMetadata {
+ createBranch(ConnectorSession session, ConnectorTableHandle tableHandle, String branchName, Optional~Long~ snapshotId, Optional~Long~ asOfTimestampMillis, Optional~Long~ retainDays, Optional~Integer~ minSnapshotsToKeep, Optional~Long~ maxSnapshotAgeDays) void
}
class DelegatingMetadataManager {
+ createBranch(Session session, TableHandle tableHandle, String branchName, Optional~Long~ snapshotId, Optional~Long~ asOfTimestampMillis, Optional~Long~ retainDays, Optional~Integer~ minSnapshotsToKeep, Optional~Long~ maxSnapshotAgeDays) void
}
DDLDefinitionTask~CreateBranch~ <|.. CreateBranchTask
CreateBranchTask --> Metadata : uses
CreateBranchTask --> AccessControl : uses
MetadataManager ..|> Metadata
MetadataManager --> ConnectorMetadata : delegates
ConnectorMetadata <|.. IcebergAbstractMetadata
ConnectorMetadata <|.. ClassLoaderSafeConnectorMetadata
AccessControlManager ..|> AccessControl
AccessControlManager --> SystemAccessControl : systemAccessControl
AccessControlManager --> ConnectorAccessControl : connector access
DelegatingMetadataManager ..|> Metadata
DelegatingMetadataManager --> Metadata : delegate
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Codenotify: Notifying subscribers in CODENOTIFY files for diff 18c15e2...bcd320b.
|
There was a problem hiding this comment.
Hey - I've found 5 issues, and left some high level feedback:
- In ForwardingConnectorAccessControl.checkCanCreateBranch you are delegating to checkCanDropBranch instead of checkCanCreateBranch, which will route create-branch authorization checks to the wrong method.
- CreateBranchTask assumes the table handle is present (tableHandleOptional.get()) and ignores the optional IF EXISTS in the grammar; consider aligning behavior with DROP BRANCH by returning a user-facing error when the table is missing instead of risking a NullPointerException.
- The CREATE BRANCH grammar allows specifying both FOR SYSTEM_VERSION and FOR SYSTEM_TIME in the same statement, and SqlFormatter will print both; if only one is intended to be supported at a time, it would be better to enforce mutual exclusivity in the parser or analyzer and/or raise a semantic error when both are provided.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In ForwardingConnectorAccessControl.checkCanCreateBranch you are delegating to checkCanDropBranch instead of checkCanCreateBranch, which will route create-branch authorization checks to the wrong method.
- CreateBranchTask assumes the table handle is present (tableHandleOptional.get()) and ignores the optional IF EXISTS in the grammar; consider aligning behavior with DROP BRANCH by returning a user-facing error when the table is missing instead of risking a NullPointerException.
- The CREATE BRANCH grammar allows specifying both FOR SYSTEM_VERSION and FOR SYSTEM_TIME in the same statement, and SqlFormatter will print both; if only one is intended to be supported at a time, it would be better to enforce mutual exclusivity in the parser or analyzer and/or raise a semantic error when both are provided.
## Individual Comments
### Comment 1
<location> `presto-plugin-toolkit/src/main/java/com/facebook/presto/plugin/base/security/ForwardingConnectorAccessControl.java:282-278` </location>
<code_context>
}
+ @Override
+ public void checkCanCreateBranch(ConnectorTransactionHandle transactionHandle, ConnectorIdentity identity, AccessControlContext context, SchemaTableName tableName)
+ {
+ }
</code_context>
<issue_to_address>
**🚨 issue (security):** checkCanCreateBranch delegates to checkCanDropBranch instead of the corresponding create-branch method.
The new `checkCanCreateBranch` override is delegating to `checkCanDropBranch(...)`, so create-branch authorization will incorrectly follow drop-branch policies. Please delegate to `delegate().checkCanCreateBranch(transactionHandle, identity, context, tableName)` instead.
</issue_to_address>
### Comment 2
<location> `presto-main-base/src/main/java/com/facebook/presto/execution/CreateBranchTask.java:60-69` </location>
<code_context>
+ Optional<TableHandle> tableHandleOptional = metadata.getMetadataResolver(session).getTableHandle(tableName);
</code_context>
<issue_to_address>
**issue (bug_risk):** Accessing tableHandleOptional without presence check can lead to NPE instead of a semantic error when the table does not exist.
`getTableHandle` returns an empty Optional when the table is missing, so calling `tableHandleOptional.get()` can cause an NPE. Instead, check `isPresent()` and, if empty, throw a `SemanticException` (e.g., `TABLE_NOT_FOUND`) before invoking `metadata.createBranch` to match behavior of other DDL operations.
</issue_to_address>
### Comment 3
<location> `presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorAccessControl.java:440-445` </location>
<code_context>
denyDropBranch(tableName.toString());
}
+ /**
+ * Check if identity is allowed to drop branch from the specified table in this catalog.
+ *
+ * @throws com.facebook.presto.spi.security.AccessDeniedException if not allowed
</code_context>
<issue_to_address>
**nitpick (typo):** Javadoc for checkCanCreateBranch mistakenly refers to dropping a branch.
Please update the Javadoc above `checkCanCreateBranch` to say "create branch" instead of "drop branch" so it matches the method name and intent.
```suggestion
/**
* Check if identity is allowed to create branch for the specified table in this catalog.
*
* @throws com.facebook.presto.spi.security.AccessDeniedException if not allowed
*/
default void checkCanCreateBranch(ConnectorTransactionHandle transactionHandle, ConnectorIdentity identity, AccessControlContext context, SchemaTableName tableName)
```
</issue_to_address>
### Comment 4
<location> `presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergCreateBranch.java:68-77` </location>
<code_context>
+ assertUpdate(session, "ALTER TABLE test_table_for_branch DROP BRANCH 'version_branch'");
+ }
+
+ @Test
+ public void testCreateBranchFromTimestamp()
+ {
+ ZonedDateTime committedAt = (ZonedDateTime) computeScalar(session, "SELECT committed_at FROM \"test_table_for_branch$snapshots\" ORDER BY committed_at DESC LIMIT 1");
+ DateTimeFormatter prestoTimestamp = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS XXX");
+ String timestampLiteral = committedAt.format(prestoTimestamp);
+ assertUpdate(session, format("ALTER TABLE test_table_for_branch CREATE BRANCH 'time_branch' FOR SYSTEM_TIME AS OF TIMESTAMP '%s'", timestampLiteral));
+ assertQuery(session, "SELECT count(*) FROM test_table_for_branch FOR SYSTEM_VERSION AS OF 'time_branch'", "VALUES 2");
+ assertUpdate(session, "ALTER TABLE test_table_for_branch DROP BRANCH 'time_branch'");
+ }
+
</code_context>
<issue_to_address>
**issue (testing):** testCreateBranchFromTimestamp can become flaky because it depends on the latest snapshot, which may include rows inserted by other tests
This test derives the timestamp via `SELECT committed_at ... ORDER BY committed_at DESC LIMIT 1` and then expects `VALUES 2`. If `testCreateBranchFromVersion` (or future tests) add more rows to `test_table_for_branch`, the latest snapshot may have 3+ rows and make this test order-dependent and flaky. Please make it self-contained (e.g., insert and use a dedicated snapshot/timestamp for this test) or assert against specific snapshot content/ID instead of a fixed row count. The `@Test(singleThreaded = true)` annotation doesn’t remove the brittleness of relying on method order.
</issue_to_address>
### Comment 5
<location> `presto-main-base/src/test/java/com/facebook/presto/security/TestAccessControlManager.java:598-599` </location>
<code_context>
{
}
+ @Override
+ public void checkCanCreateBranch(ConnectorTransactionHandle transactionHandle, ConnectorIdentity identity, AccessControlContext context, SchemaTableName tableName)
+ {
+ }
</code_context>
<issue_to_address>
**suggestion (testing):** New checkCanCreateBranch access-control path is not exercised by tests
This test-only `ConnectorAccessControl` now overrides `checkCanCreateBranch`, but there’s no test verifying that `AccessControlManager.checkCanCreateBranch` delegates to it (or to `SystemAccessControl`). To match `checkCanDropBranch` and similar checks, please add a test that calls `AccessControlManager.checkCanCreateBranch` for a table in a catalog wired to this `ConnectorAccessControl` and asserts the call is routed correctly (e.g., by expecting `UnsupportedOperationException` or using a test double that records the invocation).
Suggested implementation:
```java
@Override
public void checkCanCreateBranch(ConnectorTransactionHandle transactionHandle, ConnectorIdentity identity, AccessControlContext context, SchemaTableName tableName)
{
throw new UnsupportedOperationException();
}
@Override
public void checkCanDropTag(ConnectorTransactionHandle transactionHandle, ConnectorIdentity identity, AccessControlContext context, SchemaTableName tableName)
{
```
Please also add a new test method to `TestAccessControlManager` to verify delegation of `checkCanCreateBranch`. The shape should mirror the existing delegation tests for similar methods such as `checkCanDropBranch` (if present). For example:
```java
@Test
public void testCheckCanCreateBranchDelegatedToConnectorAccessControl()
{
// Arrange: create a connector access control that throws on checkCanCreateBranch
ConnectorAccessControl throwingConnectorAccessControl = new ConnectorAccessControl()
{
@Override
public void checkCanCreateBranch(
ConnectorTransactionHandle transactionHandle,
ConnectorIdentity identity,
AccessControlContext context,
SchemaTableName tableName)
{
throw new UnsupportedOperationException("createBranch delegated to connector");
}
// implement / delegate all other methods as in the existing test ConnectorAccessControl
// (you can extend the existing inner test class instead of re-implementing everything)
};
// Wire this connector access control into the AccessControlManager for a test catalog.
// Prefer to follow the pattern used by the existing delegation tests in this class, e.g.:
//
// AccessControlManager accessControlManager = createAccessControlManager(
// systemAccessControl,
// ImmutableMap.of(TEST_CATALOG, throwingConnectorAccessControl));
//
// or whatever factory/helper is already used in this test to register connector access controls.
// Act + Assert: calling AccessControlManager.checkCanCreateBranch must hit the connector and throw
SchemaTableName testTable = new SchemaTableName("test_schema", "test_table");
assertThatThrownBy(() -> accessControlManager.checkCanCreateBranch(
TRANSACTION_HANDLE,
TEST_IDENTITY,
new AccessControlContext(Optional.of(TEST_CATALOG)),
testTable))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("createBranch delegated to connector");
}
```
You will need to:
1. Place this test method in the main `TestAccessControlManager` class, alongside other delegation tests (e.g., the ones for `checkCanDropBranch`, `checkCanCreateTable`, etc.).
2. Reuse existing fixtures/constants (`accessControlManager`, `TRANSACTION_HANDLE`, `TEST_IDENTITY`, `TEST_CATALOG`, `AccessControlContext`, etc.) to match the established patterns in this file.
3. Instead of creating a new anonymous `ConnectorAccessControl`, you can also:
* Extend the existing test-only `ConnectorAccessControl` inner class already in this file, overriding only `checkCanCreateBranch` to throw `UnsupportedOperationException`, and
* Wire that subclass into the `AccessControlManager` in the same way as the other tests do.
Adjust the helper/factory calls and constant names to align with what already exists in `TestAccessControlManager`.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
...src/main/java/com/facebook/presto/plugin/base/security/ForwardingConnectorAccessControl.java
Show resolved
Hide resolved
| Optional<TableHandle> tableHandleOptional = metadata.getMetadataResolver(session).getTableHandle(tableName); | ||
|
|
||
| Optional<MaterializedViewDefinition> optionalMaterializedView = metadata.getMetadataResolver(session).getMaterializedView(tableName); | ||
| if (optionalMaterializedView.isPresent()) { | ||
| throw new SemanticException(NOT_SUPPORTED, statement, "'%s' is a materialized view, and create branch is not supported", tableName); | ||
| } | ||
|
|
||
| getConnectorIdOrThrow(session, metadata, tableName.getCatalogName()); | ||
| accessControl.checkCanCreateBranch(session.getRequiredTransactionId(), session.getIdentity(), session.getAccessControlContext(), tableName); | ||
|
|
There was a problem hiding this comment.
issue (bug_risk): Accessing tableHandleOptional without presence check can lead to NPE instead of a semantic error when the table does not exist.
getTableHandle returns an empty Optional when the table is missing, so calling tableHandleOptional.get() can cause an NPE. Instead, check isPresent() and, if empty, throw a SemanticException (e.g., TABLE_NOT_FOUND) before invoking metadata.createBranch to match behavior of other DDL operations.
presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorAccessControl.java
Show resolved
Hide resolved
| @Test | ||
| public void testCreateBranchBasic() | ||
| { | ||
| assertUpdate(session, "ALTER TABLE test_table_for_branch CREATE BRANCH 'test_branch'"); | ||
| assertQuery(session, "SELECT count(*) FROM \"test_table_for_branch$refs\" where name = 'test_branch' and type = 'BRANCH'", "VALUES 1"); | ||
| assertQuery(session, "SELECT count(*) FROM test_table_for_branch FOR SYSTEM_VERSION AS OF 'test_branch'", "VALUES 2"); | ||
| assertUpdate(session, "ALTER TABLE test_table_for_branch DROP BRANCH 'test_branch'"); | ||
| } | ||
|
|
||
| @Test |
There was a problem hiding this comment.
issue (testing): testCreateBranchFromTimestamp can become flaky because it depends on the latest snapshot, which may include rows inserted by other tests
This test derives the timestamp via SELECT committed_at ... ORDER BY committed_at DESC LIMIT 1 and then expects VALUES 2. If testCreateBranchFromVersion (or future tests) add more rows to test_table_for_branch, the latest snapshot may have 3+ rows and make this test order-dependent and flaky. Please make it self-contained (e.g., insert and use a dedicated snapshot/timestamp for this test) or assert against specific snapshot content/ID instead of a fixed row count. The @Test(singleThreaded = true) annotation doesn’t remove the brittleness of relying on method order.
61972b9 to
8b7a908
Compare
|
Should the documentation for ALTER TABLE be updated? |
8b7a908 to
2c0cfa1
Compare
Thanks for your review @steveburnett , yes I missed the document earlier. Added now, please check |
steveburnett
left a comment
There was a problem hiding this comment.
LGTM! (docs)
Pull updated branch, new local doc build. Looks good, thank you!
2c0cfa1 to
14a9dbc
Compare
hantangwangd
left a comment
There was a problem hiding this comment.
@agrawalreetika thanks for this feature. I've left a couple of initial comments about the syntax.
| | ALTER TABLE (IF EXISTS)? tableName=qualifiedName | ||
| CREATE BRANCH name=string | ||
| (FOR SYSTEM_VERSION AS OF version=INTEGER_VALUE)? | ||
| (FOR SYSTEM_TIME AS OF timestamp=expression)? |
There was a problem hiding this comment.
It appears the current syntax allows both SYSTEM_VERSION and SYSTEM_TIME to be specified in the same CREATE BRANCH statement. For example:
alter table test_table create branch 'test_branch'
for system_version as of 7859886718534873735
for system_time as of timestamp '2026-01-07 22:56:30.866';
Should we restrict this so that only one of them can be used in a single CREATE BRANCH statement?
There was a problem hiding this comment.
Thanks @hantangwangd, yes, even I think restriction would be better
There was a problem hiding this comment.
Hi @agrawalreetika, I just noticed that we already have a tableVersionExpression syntax specifically designed for specifying table versions, which offers greater expressiveness and robustness. Should we directly adopt it for table version specification? This way, we could also reuse the logic from IcebergAbstractMetadata.getSnapshotIdForTableVersion(...) to retrieve the specific snapshot ID. What do you think about this?
There was a problem hiding this comment.
Yeah that's a better idea. I missed on it. Updated grammar changes.
| | ALTER TABLE (IF EXISTS)? tableName=qualifiedName | ||
| SET PROPERTIES properties #setTableProperties | ||
| | ALTER TABLE (IF EXISTS)? tableName=qualifiedName | ||
| CREATE BRANCH name=string |
There was a problem hiding this comment.
I notice both spark and trino support the syntax CREATE OR REPLACE BRANCH and CREATE BRANCH IF NOT EXISTS. Do you think we should support them as well?
There was a problem hiding this comment.
I think it would be better, added.
14a9dbc to
19954ef
Compare
Thanks for your review @hantangwangd I have updated the PR. Please take a look when you get a chance. |
19954ef to
55c64ab
Compare
hantangwangd
left a comment
There was a problem hiding this comment.
@agrawalreetika thanks for the fix. Mostly little things and nits, otherwise looks great to me!
| if (replace && branchExists) { | ||
| icebergTable.manageSnapshots().removeBranch(branchName).commit(); | ||
| } | ||
| else if (branchExists) { | ||
| throw new PrestoException(ALREADY_EXISTS, format("Branch %s already exists in table %s", branchName, icebergTableHandle.getSchemaTableName().getTableName())); | ||
| } | ||
|
|
||
| org.apache.iceberg.ManageSnapshots manageSnapshots = icebergTable.manageSnapshots(); | ||
| org.apache.iceberg.ManageSnapshots createBranchOp; | ||
|
|
||
| if (tableVersion.isPresent()) { | ||
| long targetSnapshotId = getSnapshotIdForTableVersion(icebergTable, tableVersion.get()); | ||
| createBranchOp = manageSnapshots.createBranch(branchName, targetSnapshotId); | ||
| } | ||
| else { | ||
| if (icebergTable.currentSnapshot() == null) { | ||
| throw new PrestoException(NOT_FOUND, format("Table %s has no current snapshot", icebergTableHandle.getSchemaTableName().getTableName())); | ||
| } | ||
| createBranchOp = manageSnapshots.createBranch(branchName, icebergTable.currentSnapshot().snapshotId()); | ||
| } | ||
|
|
||
| // Apply retention policies if specified | ||
| if (retainDays.isPresent()) { | ||
| long retainMs = java.time.Duration.ofDays(retainDays.get()).toMillis(); | ||
| createBranchOp = createBranchOp.setMaxRefAgeMs(branchName, retainMs); | ||
| } | ||
| if (minSnapshotsToKeep.isPresent()) { | ||
| createBranchOp = createBranchOp.setMinSnapshotsToKeep(branchName, minSnapshotsToKeep.get()); | ||
| } | ||
| if (maxSnapshotAgeDays.isPresent()) { | ||
| long maxAgeMs = java.time.Duration.ofDays(maxSnapshotAgeDays.get()).toMillis(); | ||
| createBranchOp = createBranchOp.setMaxSnapshotAgeMs(branchName, maxAgeMs); | ||
| } | ||
| createBranchOp.commit(); |
There was a problem hiding this comment.
| if (replace && branchExists) { | |
| icebergTable.manageSnapshots().removeBranch(branchName).commit(); | |
| } | |
| else if (branchExists) { | |
| throw new PrestoException(ALREADY_EXISTS, format("Branch %s already exists in table %s", branchName, icebergTableHandle.getSchemaTableName().getTableName())); | |
| } | |
| org.apache.iceberg.ManageSnapshots manageSnapshots = icebergTable.manageSnapshots(); | |
| org.apache.iceberg.ManageSnapshots createBranchOp; | |
| if (tableVersion.isPresent()) { | |
| long targetSnapshotId = getSnapshotIdForTableVersion(icebergTable, tableVersion.get()); | |
| createBranchOp = manageSnapshots.createBranch(branchName, targetSnapshotId); | |
| } | |
| else { | |
| if (icebergTable.currentSnapshot() == null) { | |
| throw new PrestoException(NOT_FOUND, format("Table %s has no current snapshot", icebergTableHandle.getSchemaTableName().getTableName())); | |
| } | |
| createBranchOp = manageSnapshots.createBranch(branchName, icebergTable.currentSnapshot().snapshotId()); | |
| } | |
| // Apply retention policies if specified | |
| if (retainDays.isPresent()) { | |
| long retainMs = java.time.Duration.ofDays(retainDays.get()).toMillis(); | |
| createBranchOp = createBranchOp.setMaxRefAgeMs(branchName, retainMs); | |
| } | |
| if (minSnapshotsToKeep.isPresent()) { | |
| createBranchOp = createBranchOp.setMinSnapshotsToKeep(branchName, minSnapshotsToKeep.get()); | |
| } | |
| if (maxSnapshotAgeDays.isPresent()) { | |
| long maxAgeMs = java.time.Duration.ofDays(maxSnapshotAgeDays.get()).toMillis(); | |
| createBranchOp = createBranchOp.setMaxSnapshotAgeMs(branchName, maxAgeMs); | |
| } | |
| createBranchOp.commit(); | |
| long targetSnapshotId = tableVersion.map(version -> getSnapshotIdForTableVersion(icebergTable, version)) | |
| .orElseGet(() -> { | |
| if (icebergTable.currentSnapshot() == null) { | |
| throw new PrestoException(NOT_FOUND, format("Table %s has no current snapshot", icebergTableHandle.getSchemaTableName().getTableName())); | |
| } | |
| return icebergTable.currentSnapshot().snapshotId(); | |
| }); | |
| ManageSnapshots manageSnapshots = icebergTable.manageSnapshots(); | |
| if (replace && branchExists) { | |
| manageSnapshots.replaceBranch(branchName, targetSnapshotId); | |
| } | |
| else if (!branchExists) { | |
| manageSnapshots.createBranch(branchName, targetSnapshotId); | |
| } | |
| else { | |
| throw new PrestoException(ALREADY_EXISTS, format("Branch %s already exists in table %s", branchName, icebergTableHandle.getSchemaTableName().getTableName())); | |
| } | |
| // Apply retention policies if specified | |
| retainDays.ifPresent(retainDs -> manageSnapshots.setMaxRefAgeMs(branchName, ofDays(retainDs).toMillis())); | |
| minSnapshotsToKeep.ifPresent(minSnapshots -> manageSnapshots.setMinSnapshotsToKeep(branchName, minSnapshots)); | |
| maxSnapshotAgeDays.ifPresent(maxAgeDays -> manageSnapshots.setMaxSnapshotAgeMs(branchName, ofDays(maxAgeDays).toMillis())); | |
| manageSnapshots.commit(); |
nit: we may want to use replaceBranch rather than dropBranch&createBranch, and handle all the actions in a single commit.
| session, | ||
| parameterLookup, | ||
| WarningCollector.NOOP); | ||
| analyzer.analyze(stateExpr, com.facebook.presto.sql.analyzer.Scope.create()); |
There was a problem hiding this comment.
| analyzer.analyze(stateExpr, com.facebook.presto.sql.analyzer.Scope.create()); | |
| analyzer.analyze(stateExpr, Scope.create()); |
presto-main-base/src/main/java/com/facebook/presto/metadata/DelegatingMetadataManager.java
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
Show resolved
Hide resolved
presto-main-base/src/main/java/com/facebook/presto/metadata/Metadata.java
Show resolved
Hide resolved
| final SystemAccessControlStats checkCanDropTag = new SystemAccessControlStats(); | ||
| final SystemAccessControlStats checkCanDropConstraint = new SystemAccessControlStats(); | ||
| final SystemAccessControlStats checkCanAddConstraint = new SystemAccessControlStats(); | ||
| final SystemAccessControlStats checkCanCreateBranch = new SystemAccessControlStats(); |
There was a problem hiding this comment.
nit: should we add the managed getter method for this new field?
presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java
Show resolved
Hide resolved
| ALTER (COLUMN)? column=identifier DROP NOT NULL #alterColumnDropNotNull | ||
| | ALTER TABLE (IF EXISTS)? tableName=qualifiedName | ||
| SET PROPERTIES properties #setTableProperties | ||
| | ALTER TABLE (IF EXISTS)? tableName=qualifiedName |
There was a problem hiding this comment.
From what I can see, the (IF EXISTS)? expression for the target table doesn't appear to be used. Could we remove it directly?
There was a problem hiding this comment.
I have added checks for this
presto-parser/src/main/java/com/facebook/presto/sql/parser/AstBuilder.java
Show resolved
Hide resolved
| public void testCreateBranchFromVersion() | ||
| { | ||
| assertUpdate(session, "INSERT INTO test_table_for_branch VALUES (3, 'Charlie')", 1); | ||
| long snapshotId = (Long) computeScalar(session, "SELECT snapshot_id FROM \"test_table_for_branch$snapshots\" ORDER BY committed_at DESC LIMIT 1"); | ||
| assertUpdate(session, format("ALTER TABLE test_table_for_branch CREATE BRANCH 'version_branch' FOR SYSTEM_VERSION AS OF %d", snapshotId)); | ||
| assertQuery(session, "SELECT count(*) FROM test_table_for_branch FOR SYSTEM_VERSION AS OF 'version_branch'", "VALUES 3"); | ||
| assertUpdate(session, "ALTER TABLE test_table_for_branch DROP BRANCH 'version_branch'"); | ||
| } | ||
|
|
||
| @Test | ||
| public void testCreateBranchFromTimestamp() | ||
| { | ||
| ZonedDateTime committedAt = (ZonedDateTime) computeScalar(session, "SELECT committed_at FROM \"test_table_for_branch$snapshots\" ORDER BY committed_at DESC LIMIT 1"); | ||
| DateTimeFormatter prestoTimestamp = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS XXX"); | ||
| String timestampLiteral = committedAt.format(prestoTimestamp); | ||
| assertUpdate(session, format("ALTER TABLE test_table_for_branch CREATE BRANCH 'time_branch' FOR SYSTEM_TIME AS OF TIMESTAMP '%s'", timestampLiteral)); | ||
| assertQuery(session, "SELECT count(*) FROM test_table_for_branch FOR SYSTEM_VERSION AS OF 'time_branch'", "VALUES 2"); | ||
| assertUpdate(session, "ALTER TABLE test_table_for_branch DROP BRANCH 'time_branch'"); | ||
| } |
There was a problem hiding this comment.
I noticed the test logic relies on the execution order of test methods. However, as I know, TestNG does not guarantee order by default, which may lead to flaky across platforms or JDK versions.
Should we use a more robust way here? What are your thoughts?
There was a problem hiding this comment.
sure, updated tests
55c64ab to
bcd320b
Compare
Thanks for your review @hantangwangd, I have adressed your comments. |
hantangwangd
left a comment
There was a problem hiding this comment.
Thanks @agrawalreetika, lgtm!
…Manager (#27114) ## Description Add createBranch method implementation in StatsRecordingMetadataManager This started failing master build after merging - #26875 (Since branch support was merged earlier #26898) ## Motivation and Context Build fix ## Impact Add createBranch method implementation in StatsRecordingMetadataManager This started failing master build after merging - #26875 (Since branch support was merged earlier #26898) ## Test Plan NA ``` == NO RELEASE NOTE == ```
Description
Covers -
CREATE BRANCHCREATE BRANCHsupport for IcebergMotivation and Context
Resolves part of #22027
CREATE TAGsupport would be added in the subsequent PRImpact
Resolves part of #22027
SQL support for creating a branch from a table added based on the syntax discussed here :
Test Plan
Test added
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.