Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28763: Iceberg: Support functions while expiring snapshots. #5643

Merged
merged 2 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
Expand Down Expand Up @@ -146,6 +147,48 @@ public void testExpireSnapshotsWithDefaultParams() throws IOException, Interrupt

}

@Test
public void testExpireSnapshotsWithFunction() throws IOException, InterruptedException {
TableIdentifier identifier = TableIdentifier.of("default", "source");
Table table =
testTables.createTableWithVersions(shell, identifier.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 5);
Assert.assertEquals(5, table.history().size());
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS(DATE('1985-10-10'))");
table.refresh();
Assert.assertEquals(5, table.history().size());
shell.executeStatement(
"ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS(TIMESTAMP('1987-10-10 10:15:23.386'))");
table.refresh();
Assert.assertEquals(5, table.history().size());
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS(CURRENT_DATE + 5)");
table.refresh();
Assert.assertEquals(1, table.history().size());
testTables.appendIcebergTable(shell.getHiveConf(), table, fileFormat, null,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
table.refresh();
Assert.assertEquals(2, table.history().size());
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS(CURRENT_TIMESTAMP)");
table.refresh();
Assert.assertEquals(1, table.history().size());

// Test with between keyword
testTables.appendIcebergTable(shell.getHiveConf(), table, fileFormat, null,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
table.refresh();
Assert.assertEquals(2, table.history().size());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS000000");
String toTime = simpleDateFormat.format(new Date(table.history().get(0).timestampMillis()));
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS BETWEEN " +
"(CURRENT_DATE - 1) AND '" + toTime + "'");
table.refresh();
Assert.assertEquals(1, IterableUtils.size(table.snapshots()));
AssertHelpers.assertThrows("Invalid timestamp expression", IllegalArgumentException.class, () ->
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS BETWEEN " +
"(RAND()) AND '" + toTime + "'"));
}

@Test
public void testDeleteOrphanFiles() throws IOException, InterruptedException {
TableIdentifier identifier = TableIdentifier.of("default", "source");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,22 +493,28 @@ alterStatementSuffixExecute
@after { gParent.popMsg(state); }
: KW_EXECUTE KW_ROLLBACK LPAREN (rollbackParam=(StringLiteral | Number)) RPAREN
-> ^(TOK_ALTERTABLE_EXECUTE KW_ROLLBACK $rollbackParam)
| KW_EXECUTE KW_EXPIRE_SNAPSHOTS (LPAREN (expireParam=StringLiteral) RPAREN)?
| KW_EXECUTE KW_EXPIRE_SNAPSHOTS (LPAREN (expireParam=expression) RPAREN)?
-> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $expireParam?)
| KW_EXECUTE KW_SET_CURRENT_SNAPSHOT LPAREN (snapshotParam=expression) RPAREN
-> ^(TOK_ALTERTABLE_EXECUTE KW_SET_CURRENT_SNAPSHOT $snapshotParam)
| KW_EXECUTE KW_FAST_FORWARD sourceBranch=StringLiteral (targetBranch=StringLiteral)?
-> ^(TOK_ALTERTABLE_EXECUTE KW_FAST_FORWARD $sourceBranch $targetBranch?)
| KW_EXECUTE KW_CHERRY_PICK snapshotId=Number
-> ^(TOK_ALTERTABLE_EXECUTE KW_CHERRY_PICK $snapshotId)
| KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_BETWEEN (fromTimestamp=StringLiteral) KW_AND (toTimestamp=StringLiteral)
| KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_BETWEEN
fromTimestamp=timestampExpression KW_AND toTimestamp=timestampExpression
-> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $fromTimestamp $toTimestamp)
| KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_RETAIN KW_LAST numToRetain=Number
-> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS KW_RETAIN $numToRetain)
| KW_EXECUTE KW_DELETE KW_ORPHAN_FILES (KW_OLDER KW_THAN LPAREN (timestamp=StringLiteral) RPAREN)?
-> ^(TOK_ALTERTABLE_EXECUTE KW_ORPHAN_FILES $timestamp?)
;

timestampExpression
: StringLiteral -> StringLiteral
| LPAREN expression RPAREN -> expression
;

alterStatementSuffixRenameBranch
@init { gParent.pushMsg("alter table rename branch", state); }
@after { gParent.popMsg(state); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExpireSnapshotsSpec;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.FastForwardSpec;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.RollbackSpec;
import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.session.SessionState;

Expand Down Expand Up @@ -85,7 +90,7 @@ protected void analyzeCommand(TableName tableName, Map<String, String> partition
desc = getRollbackDesc(tableName, partitionSpec, (ASTNode) command.getChild(1));
break;
case HiveParser.KW_EXPIRE_SNAPSHOTS:
desc = getExpireSnapshotDesc(tableName, partitionSpec, command.getChildren());
desc = getExpireSnapshotDesc(tableName, partitionSpec, command.getChildren(), queryState.getConf());
break;
case HiveParser.KW_SET_CURRENT_SNAPSHOT:
desc = getSetCurrentSnapshotDesc(tableName, partitionSpec, (ASTNode) command.getChild(1));
Expand All @@ -98,7 +103,7 @@ protected void analyzeCommand(TableName tableName, Map<String, String> partition
break;
case HiveParser.KW_ORPHAN_FILES:
desc = getDeleteOrphanFilesDesc(tableName, partitionSpec, command.getChildren());
break;
break;
}

rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc)));
Expand Down Expand Up @@ -139,7 +144,7 @@ private static AlterTableExecuteDesc getSetCurrentSnapshotDesc(TableName tableNa
}

private static AlterTableExecuteDesc getExpireSnapshotDesc(TableName tableName, Map<String, String> partitionSpec,
List<Node> children) throws SemanticException {
List<Node> children, HiveConf conf) throws SemanticException {
AlterTableExecuteSpec<ExpireSnapshotsSpec> spec;
if (children.size() == 1) {
spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, null);
Expand All @@ -158,19 +163,33 @@ private static AlterTableExecuteDesc getExpireSnapshotDesc(TableName tableName,
} else if (children.size() == 3) {
ASTNode secondNode = (ASTNode) children.get(2);
String secondNodeText = PlanUtils.stripQuotes(secondNode.getText().trim());
TimestampTZ fromTime = TimestampTZUtil.parse(firstNodeText, timeZone);
TimestampTZ toTime = TimestampTZUtil.parse(secondNodeText, timeZone);
TimestampTZ fromTime = TimestampTZUtil.parse(getTimeStampString(conf, firstNode, firstNodeText), timeZone);
TimestampTZ toTime = TimestampTZUtil.parse(getTimeStampString(conf, secondNode, secondNodeText), timeZone);
spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT,
new ExpireSnapshotsSpec(fromTime.toEpochMilli(), toTime.toEpochMilli()));
} else if (EXPIRE_SNAPSHOT_BY_ID_REGEX.matcher(firstNodeText).matches()) {
spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new ExpireSnapshotsSpec(firstNodeText));
} else {
TimestampTZ time = TimestampTZUtil.parse(firstNodeText, timeZone);
TimestampTZ time = TimestampTZUtil.parse(getTimeStampString(conf, firstNode, firstNodeText), timeZone);
spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new ExpireSnapshotsSpec(time.toEpochMilli()));
}
return new AlterTableExecuteDesc(tableName, partitionSpec, spec);
}

private static String getTimeStampString(HiveConf conf, ASTNode node, String nodeText) throws SemanticException {
if (node.getChildCount() > 0) {
QueryState queryState = new QueryState.Builder().withGenerateNewQueryId(false).withHiveConf(conf).build();
SemanticAnalyzer sem = (SemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, node);
ExprNodeDesc desc = sem.genExprNodeDesc(node, new RowResolver(), false, true);
if(!(desc instanceof ExprNodeConstantDesc)) {
throw new SemanticException("Invalid timestamp expression");
}
ExprNodeConstantDesc constantDesc = (ExprNodeConstantDesc) desc;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd check if it is a constant or not. This throws ClassCastException.

shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS(RAND())");

Additionally, we may check whether the type of constantDesc is acceptable here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a check that desc is of type ExprNodeConstantDesc, whether constantDesc is acceptable or not I think the original logic should take care, It should throw some DateTimeParsing exception, same as in case some one provides non timestamp string

return String.valueOf(constantDesc.getValue());
}
return nodeText;
}

private static AlterTableExecuteDesc getRollbackDesc(TableName tableName, Map<String, String> partitionSpec,
ASTNode childNode) throws SemanticException {
AlterTableExecuteSpec<RollbackSpec> spec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13533,7 +13533,7 @@ public ExprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input)
return genExprNodeDesc(expr, input, true, false);
}

ExprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input, boolean useCaching,
public ExprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input, boolean useCaching,
boolean foldExpr) throws SemanticException {
TypeCheckCtx tcCtx = new TypeCheckCtx(input, useCaching, foldExpr);
return genExprNodeDesc(expr, input, tcCtx);
Expand Down
Loading