From 430fd9ede9a30fc727c5ccd3330a5329732c3135 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Thu, 13 Feb 2025 04:40:55 +0530 Subject: [PATCH 1/2] HIVE-28763: Iceberg: Support functions while expiring snapshots. --- .../hive/TestHiveIcebergExpireSnapshots.java | 39 +++++++++++++++++++ .../hadoop/hive/ql/parse/AlterClauseParser.g | 10 ++++- .../execute/AlterTableExecuteAnalyzer.java | 28 ++++++++++--- .../hive/ql/parse/SemanticAnalyzer.java | 2 +- 4 files changed, 70 insertions(+), 9 deletions(-) diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java index 9f036a5615a5..861f44e2e2f4 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java @@ -146,6 +146,45 @@ public void testExpireSnapshotsWithDefaultParams() throws IOException, Interrupt } + @Test + public void testExpireSnapshotsWithFunction() throws IOException, InterruptedException { + TableIdentifier identifier = TableIdentifier.of("default", "source"); + Table table = + testTables.createTableWithVersions(shell, identifier.name(), + HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat, + HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 5); + Assert.assertEquals(5, table.history().size()); + shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS(DATE('1985-10-10'))"); + table.refresh(); + Assert.assertEquals(5, table.history().size()); + shell.executeStatement( + "ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS(TIMESTAMP('1987-10-10 10:15:23.386'))"); + table.refresh(); + Assert.assertEquals(5, table.history().size()); + shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS(CURRENT_DATE + 5)"); + table.refresh(); + Assert.assertEquals(1, table.history().size()); + testTables.appendIcebergTable(shell.getHiveConf(), table, fileFormat, null, + HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS); + table.refresh(); + Assert.assertEquals(2, table.history().size()); + shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS(CURRENT_TIMESTAMP)"); + table.refresh(); + Assert.assertEquals(1, table.history().size()); + + // Test with between keyword + testTables.appendIcebergTable(shell.getHiveConf(), table, fileFormat, null, + HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS); + table.refresh(); + Assert.assertEquals(2, table.history().size()); + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS000000"); + String toTime = simpleDateFormat.format(new Date(table.history().get(0).timestampMillis())); + shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS BETWEEN " + + "(CURRENT_DATE - 1) AND '" + toTime + "'"); + table.refresh(); + Assert.assertEquals(1, IterableUtils.size(table.snapshots())); + } + @Test public void testDeleteOrphanFiles() throws IOException, InterruptedException { TableIdentifier identifier = TableIdentifier.of("default", "source"); diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g index b896e3d35b1c..4ec922962050 100644 --- a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g +++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g @@ -493,7 +493,7 @@ alterStatementSuffixExecute @after { gParent.popMsg(state); } : KW_EXECUTE KW_ROLLBACK LPAREN (rollbackParam=(StringLiteral | Number)) RPAREN -> ^(TOK_ALTERTABLE_EXECUTE KW_ROLLBACK $rollbackParam) - | KW_EXECUTE KW_EXPIRE_SNAPSHOTS (LPAREN (expireParam=StringLiteral) RPAREN)? + | KW_EXECUTE KW_EXPIRE_SNAPSHOTS (LPAREN (expireParam=expression) RPAREN)? -> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $expireParam?) | KW_EXECUTE KW_SET_CURRENT_SNAPSHOT LPAREN (snapshotParam=expression) RPAREN -> ^(TOK_ALTERTABLE_EXECUTE KW_SET_CURRENT_SNAPSHOT $snapshotParam) @@ -501,7 +501,8 @@ alterStatementSuffixExecute -> ^(TOK_ALTERTABLE_EXECUTE KW_FAST_FORWARD $sourceBranch $targetBranch?) | KW_EXECUTE KW_CHERRY_PICK snapshotId=Number -> ^(TOK_ALTERTABLE_EXECUTE KW_CHERRY_PICK $snapshotId) - | KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_BETWEEN (fromTimestamp=StringLiteral) KW_AND (toTimestamp=StringLiteral) + | KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_BETWEEN + fromTimestamp=timestampExpression KW_AND toTimestamp=timestampExpression -> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $fromTimestamp $toTimestamp) | KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_RETAIN KW_LAST numToRetain=Number -> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS KW_RETAIN $numToRetain) @@ -509,6 +510,11 @@ alterStatementSuffixExecute -> ^(TOK_ALTERTABLE_EXECUTE KW_ORPHAN_FILES $timestamp?) ; +timestampExpression + : StringLiteral -> StringLiteral + | LPAREN expression RPAREN -> expression + ; + alterStatementSuffixRenameBranch @init { gParent.pushMsg("alter table rename branch", state); } @after { gParent.popMsg(state); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java index 275a0e1a4c56..75ca9fd74d01 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java @@ -39,7 +39,12 @@ import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExpireSnapshotsSpec; import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.FastForwardSpec; import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.RollbackSpec; +import org.apache.hadoop.hive.ql.parse.RowResolver; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.session.SessionState; @@ -85,7 +90,7 @@ protected void analyzeCommand(TableName tableName, Map partition desc = getRollbackDesc(tableName, partitionSpec, (ASTNode) command.getChild(1)); break; case HiveParser.KW_EXPIRE_SNAPSHOTS: - desc = getExpireSnapshotDesc(tableName, partitionSpec, command.getChildren()); + desc = getExpireSnapshotDesc(tableName, partitionSpec, command.getChildren(), queryState.getConf()); break; case HiveParser.KW_SET_CURRENT_SNAPSHOT: desc = getSetCurrentSnapshotDesc(tableName, partitionSpec, (ASTNode) command.getChild(1)); @@ -98,7 +103,7 @@ protected void analyzeCommand(TableName tableName, Map partition break; case HiveParser.KW_ORPHAN_FILES: desc = getDeleteOrphanFilesDesc(tableName, partitionSpec, command.getChildren()); - break; + break; } rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc))); @@ -139,7 +144,7 @@ private static AlterTableExecuteDesc getSetCurrentSnapshotDesc(TableName tableNa } private static AlterTableExecuteDesc getExpireSnapshotDesc(TableName tableName, Map partitionSpec, - List children) throws SemanticException { + List children, HiveConf conf) throws SemanticException { AlterTableExecuteSpec spec; if (children.size() == 1) { spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, null); @@ -158,19 +163,30 @@ private static AlterTableExecuteDesc getExpireSnapshotDesc(TableName tableName, } else if (children.size() == 3) { ASTNode secondNode = (ASTNode) children.get(2); String secondNodeText = PlanUtils.stripQuotes(secondNode.getText().trim()); - TimestampTZ fromTime = TimestampTZUtil.parse(firstNodeText, timeZone); - TimestampTZ toTime = TimestampTZUtil.parse(secondNodeText, timeZone); + TimestampTZ fromTime = TimestampTZUtil.parse(getTimeStampString(conf, firstNode, firstNodeText), timeZone); + TimestampTZ toTime = TimestampTZUtil.parse(getTimeStampString(conf, secondNode, secondNodeText), timeZone); spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new ExpireSnapshotsSpec(fromTime.toEpochMilli(), toTime.toEpochMilli())); } else if (EXPIRE_SNAPSHOT_BY_ID_REGEX.matcher(firstNodeText).matches()) { spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new ExpireSnapshotsSpec(firstNodeText)); } else { - TimestampTZ time = TimestampTZUtil.parse(firstNodeText, timeZone); + TimestampTZ time = TimestampTZUtil.parse(getTimeStampString(conf, firstNode, firstNodeText), timeZone); spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new ExpireSnapshotsSpec(time.toEpochMilli())); } return new AlterTableExecuteDesc(tableName, partitionSpec, spec); } + private static String getTimeStampString(HiveConf conf, ASTNode node, String nodeText) throws SemanticException { + if (node.getChildCount() > 0) { + QueryState queryState = new QueryState.Builder().withGenerateNewQueryId(false).withHiveConf(conf).build(); + SemanticAnalyzer sem = (SemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, node); + ExprNodeDesc desc = sem.genExprNodeDesc(node, new RowResolver(), false, true); + ExprNodeConstantDesc constantDesc = (ExprNodeConstantDesc) desc; + return String.valueOf(constantDesc.getValue()); + } + return nodeText; + } + private static AlterTableExecuteDesc getRollbackDesc(TableName tableName, Map partitionSpec, ASTNode childNode) throws SemanticException { AlterTableExecuteSpec spec; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 26267654a4b5..e8f0bd380e28 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -13533,7 +13533,7 @@ public ExprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input) return genExprNodeDesc(expr, input, true, false); } - ExprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input, boolean useCaching, + public ExprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input, boolean useCaching, boolean foldExpr) throws SemanticException { TypeCheckCtx tcCtx = new TypeCheckCtx(input, useCaching, foldExpr); return genExprNodeDesc(expr, input, tcCtx); From 9482cc8138eb58ed680975f957a4fb77e1d64344 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Tue, 18 Feb 2025 00:46:52 +0530 Subject: [PATCH 2/2] Address Review Comments. --- .../iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java | 4 ++++ .../java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g | 2 +- .../hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java | 3 +++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java index 861f44e2e2f4..ac8eeeb71b2f 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.functional.RemoteIterators; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; @@ -183,6 +184,9 @@ public void testExpireSnapshotsWithFunction() throws IOException, InterruptedExc "(CURRENT_DATE - 1) AND '" + toTime + "'"); table.refresh(); Assert.assertEquals(1, IterableUtils.size(table.snapshots())); + AssertHelpers.assertThrows("Invalid timestamp expression", IllegalArgumentException.class, () -> + shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS BETWEEN " + + "(RAND()) AND '" + toTime + "'")); } @Test diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g index 4ec922962050..5c4fb550d039 100644 --- a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g +++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g @@ -501,7 +501,7 @@ alterStatementSuffixExecute -> ^(TOK_ALTERTABLE_EXECUTE KW_FAST_FORWARD $sourceBranch $targetBranch?) | KW_EXECUTE KW_CHERRY_PICK snapshotId=Number -> ^(TOK_ALTERTABLE_EXECUTE KW_CHERRY_PICK $snapshotId) - | KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_BETWEEN + | KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_BETWEEN fromTimestamp=timestampExpression KW_AND toTimestamp=timestampExpression -> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $fromTimestamp $toTimestamp) | KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_RETAIN KW_LAST numToRetain=Number diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java index 75ca9fd74d01..96b2bdac7484 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java @@ -181,6 +181,9 @@ private static String getTimeStampString(HiveConf conf, ASTNode node, String nod QueryState queryState = new QueryState.Builder().withGenerateNewQueryId(false).withHiveConf(conf).build(); SemanticAnalyzer sem = (SemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, node); ExprNodeDesc desc = sem.genExprNodeDesc(node, new RowResolver(), false, true); + if(!(desc instanceof ExprNodeConstantDesc)) { + throw new SemanticException("Invalid timestamp expression"); + } ExprNodeConstantDesc constantDesc = (ExprNodeConstantDesc) desc; return String.valueOf(constantDesc.getValue()); }