diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java index 144d4826bd8fc..ae5567f7d8f61 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java @@ -1828,6 +1828,11 @@ private List computeFileNamesForMissingBuckets( @Override public HiveInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return beginInsertInternal(session, tableHandle); + } + + private HiveInsertTableHandle beginInsertInternal(ConnectorSession session, ConnectorTableHandle tableHandle) { verifyJvmTimeZone(); @@ -1924,6 +1929,11 @@ private HiveCompressionCodec getHiveCompressionCodec(ConnectorSession session, b @Override public Optional finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection fragments, Collection computedStatistics) + { + return finishInsertInternal(session, insertHandle, fragments, computedStatistics); + } + + private Optional finishInsertInternal(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection fragments, Collection computedStatistics) { HiveInsertTableHandle handle = (HiveInsertTableHandle) insertHandle; @@ -2365,6 +2375,14 @@ public void createMaterializedView(ConnectorSession session, ConnectorTableMetad } Table basicTable = prepareTable(session, viewMetadata, MATERIALIZED_VIEW); + viewDefinition = new ConnectorMaterializedViewDefinition( + viewDefinition.getOriginalSql(), + viewDefinition.getSchema(), + viewDefinition.getTable(), + viewDefinition.getBaseTables(), + viewDefinition.getOwner(), + viewDefinition.getColumnMappings(), + Optional.of(getPartitionedBy(viewMetadata.getProperties()))); Map parameters = ImmutableMap.builder() .putAll(basicTable.getParameters()) .put(PRESTO_MATERIALIZED_VIEW_FLAG, "true") @@ -2412,6 +2430,18 @@ public void dropMaterializedView(ConnectorSession session, SchemaTableName viewN } } + @Override + public HiveInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return beginInsertInternal(session, tableHandle); + } + + @Override + public Optional finishRefreshMaterializedView(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection fragments, Collection computedStatistics) + { + return finishInsertInternal(session, insertHandle, fragments, computedStatistics); + } + @Override public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle) { diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriterFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriterFactory.java index e9d7f7026b2be..3178d26aa656f 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriterFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriterFactory.java @@ -492,6 +492,10 @@ private WriterParameters getWriterParametersForExistingUnpartitionedTable(Option private WriterParameters getWriterParametersForExistingPartitionedTable(String partitionName, OptionalInt bucketNumber) { + if (MetastoreUtil.isPrestoMaterializedView(table)) { + return getWriterParametersForOverwritePartition(partitionName); + } + switch (insertExistingPartitionsBehavior) { case APPEND: return getWriterParametersForAppendPartition(partitionName, bucketNumber); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java index 0b4ae0d17babc..186045aab54a8 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java @@ -50,6 +50,7 @@ import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.tests.AbstractTestIntegrationSmokeTest; import com.facebook.presto.tests.DistributedQueryRunner; +import com.facebook.presto.tests.QueryAssertions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -134,6 +135,7 @@ import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.airlift.tpch.TpchTable.CUSTOMER; import static io.airlift.tpch.TpchTable.LINE_ITEM; +import static io.airlift.tpch.TpchTable.NATION; import static io.airlift.tpch.TpchTable.ORDERS; import static io.airlift.tpch.TpchTable.PART_SUPPLIER; import static java.lang.String.format; @@ -183,7 +185,7 @@ protected TestHiveIntegrationSmokeTest( protected QueryRunner createQueryRunner() throws Exception { - return HiveQueryRunner.createQueryRunner(ORDERS, CUSTOMER, LINE_ITEM, PART_SUPPLIER); + return HiveQueryRunner.createQueryRunner(ORDERS, CUSTOMER, LINE_ITEM, PART_SUPPLIER, NATION); } private List getPartitions(HiveTableLayoutHandle tableLayoutHandle) @@ -5537,6 +5539,104 @@ public void testDropMaterializedView() computeActual("DROP TABLE IF EXISTS test_customer_base_4"); } + @Test + public void testRefreshMaterializedViewSimple() + { + Session session = getSession(); + QueryRunner queryRunner = getQueryRunner(); + + computeActual( + "CREATE TABLE orders_partitioned WITH (partitioned_by = ARRAY['ds']) " + + "AS SELECT orderkey, orderpriority, '2020-01-01' as ds FROM orders WHERE orderkey < 1000 " + + "UNION ALL " + + "SELECT orderkey, orderpriority, '2019-01-02' as ds FROM orders WHERE orderkey > 1000"); + computeActual( + "CREATE MATERIALIZED VIEW test_orders_view WITH (partitioned_by = ARRAY['ds']) " + + "AS SELECT orderkey, orderpriority, ds FROM orders_partitioned"); + + String refreshSql = "REFRESH MATERIALIZED VIEW test_orders_view WHERE ds='2020-01-01'"; + String expectedInsertQuery = "SELECT orderkey, orderpriority, ds " + + "FROM (" + + " SELECT * FROM orders_partitioned WHERE ds='2020-01-01'" + + ") orders_partitioned"; + QueryAssertions.assertQuery( + queryRunner, + session, + refreshSql, + queryRunner, + "SELECT COUNT(*) FROM ( " + expectedInsertQuery + " )", + false, true); + } + + @Test + public void testRefreshMaterializedView() + { + Session session = getSession(); + QueryRunner queryRunner = getQueryRunner(); + + computeActual("CREATE TABLE test_nation_base_5 WITH (partitioned_by = ARRAY['nationkey', 'regionkey']) AS SELECT name, nationkey, regionkey FROM nation"); + computeActual("CREATE TABLE test_customer_base_5 WITH (partitioned_by = ARRAY['nationkey']) AS SELECT custkey, name, mktsegment, nationkey FROM customer"); + computeActual( + "CREATE MATERIALIZED VIEW test_customer_view_5 WITH (partitioned_by = ARRAY['marketsegment', 'nationkey', 'regionkey']" + retentionDays(30) + ") " + + "AS SELECT test_nation_base_5.name AS nationname, customer.custkey, customer.name AS customername, UPPER(customer.mktsegment) AS marketsegment, customer.nationkey, regionkey " + + "FROM test_nation_base_5 JOIN test_customer_base_5 customer ON (test_nation_base_5.nationkey = customer.nationkey)"); + + // Test predicate columns from two base tables + String refreshSql = "REFRESH MATERIALIZED VIEW test_customer_view_5 WHERE marketsegment = 'AUTOMOBILE' AND nationkey = 24 AND regionkey = 1"; + String expectedInsertQuery = "SELECT *" + + "FROM (" + + " SELECT nation.name AS nationname, customer.custkey, customer.name AS customername, UPPER(customer.mktsegment) AS marketsegment, customer.nationkey, regionkey " + + " FROM (" + + " SELECT * FROM test_nation_base_5 WHERE regionkey = 1" + + " ) nation JOIN (" + + " SELECT * FROM test_customer_base_5 WHERE nationkey = 24" + + " ) customer ON (nation.nationkey = customer.nationkey)" + + ") WHERE marketsegment = 'AUTOMOBILE'"; + QueryAssertions.assertQuery( + queryRunner, + session, + refreshSql, + queryRunner, + "SELECT COUNT(*) FROM ( " + expectedInsertQuery + " )", + false, true); + + // Test predicate columns from one base table + refreshSql = "REFRESH MATERIALIZED VIEW test_customer_view_5 WHERE marketsegment = 'AUTOMOBILE' AND nationkey = 24"; + expectedInsertQuery = "SELECT *" + + "FROM (" + + " SELECT nation.name AS nationname, customer.custkey, customer.name AS customername, UPPER(customer.mktsegment) AS marketsegment, customer.nationkey, regionkey " + + " FROM test_nation_base_5 nation JOIN (" + + " SELECT * FROM test_customer_base_5 WHERE nationkey = 24" + + " ) customer ON (nation.nationkey = customer.nationkey)" + + ") WHERE marketsegment = 'AUTOMOBILE'"; + QueryAssertions.assertQuery( + queryRunner, + session, + refreshSql, + queryRunner, + "SELECT COUNT(*) FROM ( " + expectedInsertQuery + " )", + false, true); + + refreshSql = "REFRESH MATERIALIZED VIEW test_customer_view_5 WHERE regionkey = 1"; + expectedInsertQuery = "SELECT nation.name AS nationname, customer.custkey, customer.name AS customername, UPPER(customer.mktsegment) AS marketsegment, customer.nationkey, regionkey " + + "FROM (" + + " SELECT * FROM test_nation_base_5 WHERE regionkey = 1" + + ") nation JOIN test_customer_base_5 customer ON (nation.nationkey = customer.nationkey)"; + QueryAssertions.assertQuery( + queryRunner, + session, + refreshSql, + queryRunner, + "SELECT COUNT(*) FROM ( " + expectedInsertQuery + " )", + false, true); + + // Test invalid predicates + assertQueryFails("REFRESH MATERIALIZED VIEW test_customer_view_5 WHERE nationname = 'UNITED STATES'", ".*Refresh materialized view by column nationname is not supported.*"); + assertQueryFails("REFRESH MATERIALIZED VIEW test_customer_view_5 WHERE regionkey = 1 OR nationkey = 24", ".*Only logical AND is supported in WHERE clause.*"); + assertQueryFails("REFRESH MATERIALIZED VIEW test_customer_view_5 WHERE regionkey + nationkey = 25", ".*Only columns specified on literals are supported in WHERE clause.*"); + assertQueryFails("REFRESH MATERIALIZED VIEW test_customer_view_5", ".*mismatched input ''\\. Expecting: '\\.', 'WHERE'.*"); + } + protected String retentionDays(int days) { return ""; diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveLogicalPlanner.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveLogicalPlanner.java index 41cec760c764e..9481248c111cd 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveLogicalPlanner.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveLogicalPlanner.java @@ -1040,7 +1040,7 @@ public void testVirtualBucketing() // TODO: plan verification https://github.com/prestodb/presto/issues/16031 // TODO: enable all materialized view tests after https://github.com/prestodb/presto/pull/15996 - @Test(enabled = false) + @Test public void testMaterializedViewOptimization() { QueryRunner queryRunner = getQueryRunner(); @@ -1053,8 +1053,7 @@ public void testMaterializedViewOptimization() assertUpdate("CREATE MATERIALIZED VIEW test_orders_view WITH (partitioned_by = ARRAY['ds']) " + "AS SELECT orderkey, orderpriority, ds FROM orders_partitioned"); assertTrue(getQueryRunner().tableExists(getSession(), "test_orders_view")); - assertUpdate("INSERT INTO test_orders_view(orderkey, orderpriority, ds) " + - "select orderkey, orderpriority, ds from orders_partitioned where ds='2020-01-01'", 255); + assertUpdate("REFRESH MATERIALIZED VIEW test_orders_view WHERE ds='2020-01-01'", 255); String viewQuery = "SELECT orderkey from test_orders_view where orderkey < 10000 ORDER BY orderkey"; String baseQuery = "SELECT orderkey from orders_partitioned where orderkey < 10000 ORDER BY orderkey"; @@ -1069,7 +1068,7 @@ public void testMaterializedViewOptimization() } } - @Test(enabled = false) + @Test public void testMaterializedViewOptimizationWithClause() { QueryRunner queryRunner = getQueryRunner(); @@ -1090,7 +1089,7 @@ public void testMaterializedViewOptimizationWithClause() table); assertUpdate(format("CREATE MATERIALIZED VIEW %s WITH (partitioned_by = ARRAY['ds']) AS %s", view, viewPart)); - assertUpdate(format("INSERT INTO %s(orderkey, orderpriority, ds) %s where ds='2020-01-01'", view, viewPart), 255); + assertUpdate(format("REFRESH MATERIALIZED VIEW %s where ds='2020-01-01'", view), 255); String viewQuery = format("SELECT orderkey, orderpriority, ds from %s where orderkey < 100 ORDER BY orderkey", view); String baseQuery = viewPart + " where orderkey < 100 ORDER BY orderkey"; @@ -1105,7 +1104,7 @@ public void testMaterializedViewOptimizationWithClause() } } - @Test(enabled = false) + @Test public void testMaterializedViewOptimizationFullyMaterialized() { QueryRunner queryRunner = getQueryRunner(); @@ -1120,7 +1119,8 @@ public void testMaterializedViewOptimizationFullyMaterialized() assertUpdate(format("CREATE MATERIALIZED VIEW %s WITH (partitioned_by = ARRAY['ds']) " + "AS SELECT orderkey, orderpriority, ds FROM %s", view, table)); assertTrue(getQueryRunner().tableExists(getSession(), view)); - assertUpdate(format("INSERT INTO %s(orderkey, orderpriority, ds) select orderkey, orderpriority, ds from %s", view, table), 15000); + assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE ds = '2020-01-01'", view), 255); + assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE ds = '2019-01-02'", view), 14745); String viewQuery = format("SELECT orderkey from %s where orderkey < 10000 ORDER BY orderkey", view); String baseQuery = format("SELECT orderkey from %s where orderkey < 10000 ORDER BY orderkey", table); @@ -1135,7 +1135,7 @@ public void testMaterializedViewOptimizationFullyMaterialized() } } - @Test(enabled = false) + @Test public void testMaterializedViewOptimizationNotMaterialized() { String base = "orders_partitioned_not_materialized"; @@ -1165,7 +1165,7 @@ public void testMaterializedViewOptimizationNotMaterialized() } } - @Test(enabled = false) + @Test public void testMaterializedViewOptimizationWithNullPartition() { QueryRunner queryRunner = getQueryRunner(); @@ -1183,8 +1183,7 @@ public void testMaterializedViewOptimizationWithNullPartition() "SELECT orderkey, orderpriority, ds FROM %s", view, base)); assertTrue(getQueryRunner().tableExists(getSession(), view)); - assertUpdate(format("INSERT INTO %s(orderkey, orderpriority, ds) " + - "select orderkey, orderpriority, ds from %s where ds='2020-01-01'", view, base), 127); + assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE ds = '2020-01-01'", view), 127); String viewQuery = format("SELECT orderkey from %s where orderkey < 10000 ORDER BY orderkey", view); String baseQuery = format("SELECT orderkey from %s where orderkey < 10000 ORDER BY orderkey", base); @@ -1199,7 +1198,7 @@ public void testMaterializedViewOptimizationWithNullPartition() } } - @Test(enabled = false) + @Test public void testMaterializedViewWithLessGranularity() { QueryRunner queryRunner = getQueryRunner(); @@ -1217,8 +1216,7 @@ public void testMaterializedViewWithLessGranularity() assertTrue(getQueryRunner().tableExists(getSession(), view)); - assertUpdate(format("INSERT INTO %s(orderkey, orderpriority, ds) " + - "select orderkey, orderpriority, ds from %s where ds='2020-01-01'", view, base), 255); + assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE ds = '2020-01-01'", view), 255); String viewQuery = format("SELECT orderkey from %s where orderkey < 10000 ORDER BY orderkey", view); String baseQuery = format("SELECT orderkey from %s where orderkey < 10000 ORDER BY orderkey", base); @@ -1233,7 +1231,7 @@ public void testMaterializedViewWithLessGranularity() } } - @Test(enabled = false) + @Test public void testMaterializedViewForUnionAll() { QueryRunner queryRunner = getQueryRunner(); @@ -1250,7 +1248,7 @@ public void testMaterializedViewForUnionAll() assertUpdate(format("CREATE MATERIALIZED VIEW %s WITH (partitioned_by = ARRAY['nationkey']) " + "AS %s", view, baseQuery)); - assertUpdate(format("INSERT INTO %s(name, custkey, nationkey) %s WHERE nationkey < 10", view, baseQuery), 599); + assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE nationkey < 10", view, baseQuery), 599); String viewQuery = format("SELECT name, custkey, nationkey from %s ORDER BY name", view); baseQuery = format("%s ORDER BY name", baseQuery); @@ -1265,7 +1263,7 @@ public void testMaterializedViewForUnionAll() } } - @Test(enabled = false) + @Test public void testMaterializedViewForGroupingSet() { QueryRunner queryRunner = getQueryRunner(); @@ -1280,10 +1278,7 @@ public void testMaterializedViewForGroupingSet() "GROUP BY GROUPING SETS ((linenumber, shipmode), (shipmode))", view, base)); - assertUpdate(format("INSERT INTO %s(linenumber, quantity, shipmode) " + - "SELECT linenumber, SUM(DISTINCT CAST(quantity AS BIGINT)) quantity, shipmode FROM %s where shipmode='RAIL' " + - "GROUP BY GROUPING SETS ((linenumber, shipmode), (shipmode)) ", - view, base), 8); + assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE shipmode='RAIL'", view), 8); String viewQuery = format("SELECT * FROM %s ORDER BY linenumber, shipmode", view); String baseQuery = format("SELECT linenumber, SUM(DISTINCT CAST(quantity AS BIGINT)) quantity, shipmode FROM %s " + @@ -1299,7 +1294,7 @@ public void testMaterializedViewForGroupingSet() } } - @Test(enabled = false) + @Test public void testMaterializedViewWithDifferentPartitions() { QueryRunner queryRunner = getQueryRunner(); @@ -1317,8 +1312,7 @@ public void testMaterializedViewWithDifferentPartitions() assertTrue(getQueryRunner().tableExists(getSession(), view)); - assertUpdate(format("INSERT INTO %s(orderkey, orderpriority, ds, orderstatus) " + - "select orderkey, orderpriority, ds, orderstatus from %s where ds='2020-01-01'", view, base), 255); + assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE ds = '2020-01-01'", view), 255); String viewQuery = format("SELECT orderkey from %s where orderkey < 10000 ORDER BY orderkey", view); String baseQuery = format("SELECT orderkey from %s where orderkey < 10000 ORDER BY orderkey", base); @@ -1333,7 +1327,7 @@ public void testMaterializedViewWithDifferentPartitions() } } - @Test(enabled = false) + @Test public void testMaterializedViewJoinsWithOneTableAlias() { QueryRunner queryRunner = getQueryRunner(); @@ -1353,11 +1347,7 @@ public void testMaterializedViewJoinsWithOneTableAlias() "FROM %s JOIN %s customer ON (%s.nationkey = customer.nationkey)", view, table1, table1, table2, table1)); - assertUpdate(format("INSERT INTO %s(nationname, custkey, customername, marketsegment, nationkey, regionkey) " + - "SELECT %s.name AS nationname, customer.custkey, customer.name AS customername, UPPER(customer.mktsegment) " + - "AS marketsegment, customer.nationkey, regionkey FROM %s JOIN %s customer ON (%s.nationkey = customer.nationkey) " + - "WHERE customer.nationkey != 24 and %s.regionkey != 1", - view, table1, table1, table2, table1, table1), 1200); + assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE regionkey = 1", view), 300); String viewQuery = format("SELECT nationname, custkey from %s ORDER BY custkey", view); String baseQuery = format("SELECT %s.name AS nationname, customer.custkey FROM %s JOIN %s customer ON (%s.nationkey = customer.nationkey)" + @@ -1374,7 +1364,7 @@ public void testMaterializedViewJoinsWithOneTableAlias() } } - @Test(enabled = false) + @Test public void testMaterializedViewOptimizationWithDerivedFields() { QueryRunner queryRunner = getQueryRunner(); @@ -1393,9 +1383,7 @@ public void testMaterializedViewOptimizationWithDerivedFields() view, base)); assertTrue(getQueryRunner().tableExists(getSession(), view)); - assertUpdate(format("INSERT INTO %s(_discount_multi_extendedprice_, ds, shipmode) " + - "select SUM(discount*extendedprice), ds, shipmode from %s where ds='2020-01-01' group by ds, shipmode", - view, base), 7); + assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE ds='2020-01-01'", view), 7); String viewQuery = format("SELECT sum(_discount_multi_extendedprice_) from %s group by ds, shipmode ORDER BY sum(_discount_multi_extendedprice_)", view); String baseQuery = format("SELECT sum(discount * extendedprice) as _discount_multi_extendedprice_ from %s group by ds, shipmode " + @@ -1411,7 +1399,7 @@ public void testMaterializedViewOptimizationWithDerivedFields() } } - @Test(enabled = false) + @Test public void testMaterializedViewOptimizationWithDerivedFieldsWithAlias() { QueryRunner queryRunner = getQueryRunner(); @@ -1429,9 +1417,7 @@ public void testMaterializedViewOptimizationWithDerivedFieldsWithAlias() "FROM %s group by ds, shipmode", view, base)); assertTrue(getQueryRunner().tableExists(getSession(), view)); - assertUpdate(format("INSERT INTO %s(_discount_multi_extendedprice_, ds, view_shipmode) " + - "select SUM(discount*extendedprice), ds, shipmode from %s where ds='2020-01-01' group by ds, shipmode", - view, base), 7); + assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE ds='2020-01-01'", view, base), 7); String viewQuery = format("SELECT sum(_discount_multi_extendedprice_) from %s group by ds ORDER BY sum(_discount_multi_extendedprice_)", view); String baseQuery = format("SELECT sum(discount * extendedprice) as _discount_multi_extendedprice_ from %s group by ds " + @@ -1447,8 +1433,7 @@ public void testMaterializedViewOptimizationWithDerivedFieldsWithAlias() } } - //FIXME: It does not work as column map in the materialized View metadata is not correct. It only contains 1 map instead of 2. - // https://github.com/prestodb/presto/pull/15996 + //TODO: Populate columnMappings to cover all joined base tables, https://github.com/prestodb/presto/issues/16220 @Test(enabled = false) public void testMaterializedViewForJoin() { @@ -1473,10 +1458,7 @@ public void testMaterializedViewForJoin() assertTrue(queryRunner.tableExists(getSession(), view)); - assertUpdate(format("INSERT INTO %s(view_orderkey, view_totalprice, ds, view_orderpriority, view_orderstatus) " + - "SELECT SELECT t1.orderkey as view_orderkey, t2.totalprice as view_totalprice, t1.ds " + - " FROM %s t1 inner join %s t2 ON t1.ds=t2.ds" + - " where t1.ds='2020-01-01'", view, table1, table2), 65025); + assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE ds='2020-01-01'", view), 65025); String viewQuery = format("SELECT view_orderkey, view_totalprice from %s where view_orderkey < 10000", view); // getExplainPlan(viewQuery, LOGICAL); @@ -1488,7 +1470,7 @@ public void testMaterializedViewForJoin() } } - @Test(enabled = false) + @Test public void testMaterializedViewOptimizationWithDoublePartition() { QueryRunner queryRunner = getQueryRunner(); @@ -1502,8 +1484,7 @@ public void testMaterializedViewOptimizationWithDoublePartition() assertUpdate(format("CREATE MATERIALIZED VIEW %s WITH (partitioned_by = ARRAY['totalprice']) " + "AS SELECT orderkey, orderpriority, totalprice FROM %s", view, table)); assertTrue(getQueryRunner().tableExists(getSession(), view)); - assertUpdate(format("INSERT INTO %s(orderkey, orderpriority, totalprice) " + - "select orderkey, orderpriority, totalprice from %s where totalprice<65000", view, table), 3); + assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE totalprice<65000", view, table), 3); String viewQuery = format("SELECT orderkey from %s where orderkey < 10000 ORDER BY orderkey", view); String baseQuery = format("SELECT orderkey from %s where orderkey < 10000 ORDER BY orderkey", table); @@ -1518,7 +1499,7 @@ public void testMaterializedViewOptimizationWithDoublePartition() } } - @Test(enabled = false) + @Test public void testMaterializedViewForJoinWithMultiplePartitions() { QueryRunner queryRunner = getQueryRunner(); @@ -1541,10 +1522,7 @@ public void testMaterializedViewForJoinWithMultiplePartitions() "t1.ds as ds, t1.orderpriority as view_orderpriority, t2.orderstatus as view_orderstatus " + " FROM %s t1 inner join %s t2 ON t1.ds=t2.ds", view, table1, table2)); - assertUpdate(format("INSERT INTO %s(view_orderkey, view_totalprice, ds, view_orderpriority, view_orderstatus) " + - "SELECT t1.orderkey as view_orderkey, t2.totalprice as view_totalprice, t1.ds as ds, t1.orderpriority as view_orderpriority, " + - "t2.orderstatus as view_orderstatus FROM %s t1 inner join %s t2 ON t1.ds=t2.ds" + - " where t1.ds='2020-01-01'", view, table1, table2), 65025); + assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE ds='2020-01-01'", view, table1, table2), 65025); String viewQuery = format("SELECT view_orderkey from %s where view_orderkey < 10000 ORDER BY view_orderkey", view); String baseQuery = format("SELECT t1.orderkey FROM %s t1" + diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePushdownIntegrationSmokeTest.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePushdownIntegrationSmokeTest.java index 5a3ef2bdf5dbc..38a943872104d 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePushdownIntegrationSmokeTest.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePushdownIntegrationSmokeTest.java @@ -26,6 +26,7 @@ import static com.facebook.presto.spi.security.SelectedRole.Type.ROLE; import static io.airlift.tpch.TpchTable.CUSTOMER; import static io.airlift.tpch.TpchTable.LINE_ITEM; +import static io.airlift.tpch.TpchTable.NATION; import static io.airlift.tpch.TpchTable.ORDERS; import static io.airlift.tpch.TpchTable.PART_SUPPLIER; @@ -45,7 +46,7 @@ protected QueryRunner createQueryRunner() throws Exception { return HiveQueryRunner.createQueryRunner( - ImmutableList.of(ORDERS, CUSTOMER, LINE_ITEM, PART_SUPPLIER), + ImmutableList.of(ORDERS, CUSTOMER, LINE_ITEM, PART_SUPPLIER, NATION), ImmutableMap.of("experimental.pushdown-subfields-enabled", "true", "experimental.pushdown-dereference-enabled", "true"), "sql-standard", diff --git a/presto-main/src/main/java/com/facebook/presto/execution/CreateMaterializedViewTask.java b/presto-main/src/main/java/com/facebook/presto/execution/CreateMaterializedViewTask.java index e837f0bb6b468..72048f62f940b 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/CreateMaterializedViewTask.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/CreateMaterializedViewTask.java @@ -133,7 +133,8 @@ public ListenableFuture execute(CreateMaterializedView statement, Transaction viewName.getObjectName(), baseTables, Optional.of(session.getUser()), - analysis.getOriginalColumnMapping(statement.getQuery())); + analysis.getOriginalColumnMapping(statement.getQuery()), + Optional.empty()); try { metadata.createMaterializedView(session, viewName.getCatalogName(), viewMetadata, viewDefinition, statement.isNotExists()); } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java index edd1a2b324b61..0f17b45fd3c4d 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java @@ -29,7 +29,8 @@ @JsonSubTypes({ @JsonSubTypes.Type(value = ExecutionWriterTarget.CreateHandle.class, name = "CreateHandle"), @JsonSubTypes.Type(value = ExecutionWriterTarget.InsertHandle.class, name = "InsertHandle"), - @JsonSubTypes.Type(value = ExecutionWriterTarget.DeleteHandle.class, name = "DeleteHandle")}) + @JsonSubTypes.Type(value = ExecutionWriterTarget.DeleteHandle.class, name = "DeleteHandle"), + @JsonSubTypes.Type(value = ExecutionWriterTarget.RefreshMaterializedViewHandle.class, name = "RefreshMaterializedViewHandle")}) @SuppressWarnings({"EmptyClass", "ClassMayBeInterface"}) public abstract class ExecutionWriterTarget { @@ -134,4 +135,38 @@ public String toString() return handle.toString(); } } + + public static class RefreshMaterializedViewHandle + extends ExecutionWriterTarget + { + private final InsertTableHandle handle; + private final SchemaTableName schemaTableName; + + @JsonCreator + public RefreshMaterializedViewHandle( + @JsonProperty("handle") InsertTableHandle handle, + @JsonProperty("schemaTableName") SchemaTableName schemaTableName) + { + this.handle = requireNonNull(handle, "handle is null"); + this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); + } + + @JsonProperty + public InsertTableHandle getHandle() + { + return handle; + } + + @JsonProperty + public SchemaTableName getSchemaTableName() + { + return schemaTableName; + } + + @Override + public String toString() + { + return handle.toString(); + } + } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java index 4a931313443b1..179ef6b4689cc 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java @@ -96,6 +96,10 @@ private static Optional createWriterTarget(StreamingSubPl TableWriterNode.DeleteHandle delete = (TableWriterNode.DeleteHandle) target; return Optional.of(new ExecutionWriterTarget.DeleteHandle(metadata.beginDelete(session, delete.getHandle()), delete.getSchemaTableName())); } + if (target instanceof TableWriterNode.RefreshMaterializedViewReference) { + TableWriterNode.RefreshMaterializedViewReference refresh = (TableWriterNode.RefreshMaterializedViewReference) target; + return Optional.of(new ExecutionWriterTarget.RefreshMaterializedViewHandle(metadata.beginRefreshMaterializedView(session, refresh.getHandle()), refresh.getSchemaTableName())); + } throw new IllegalArgumentException("Unhandled target type: " + target.getClass().getSimpleName()); } diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java b/presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java index 09f8f749ae5aa..a68ad26e3d8f0 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java @@ -380,6 +380,16 @@ public interface Metadata */ MaterializedViewStatus getMaterializedViewStatus(Session session, QualifiedObjectName materializedViewName); + /** + * Begin refresh materialized view + */ + InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle); + + /** + * Finish refresh materialized view + */ + Optional finishRefreshMaterializedView(Session session, InsertTableHandle tableHandle, Collection fragments, Collection computedStatistics); + /** * Try to locate a table index that can lookup results by indexableColumns and provide the requested outputColumns. */ diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java b/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java index 904be3f95e781..7f2da8751198a 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java @@ -1086,6 +1086,25 @@ public MaterializedViewStatus getMaterializedViewStatus(Session session, Qualifi return metadata.getMaterializedViewStatus(session.toConnectorSession(connectorId), toSchemaTableName(materializedViewName)); } + @Override + public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle) + { + ConnectorId connectorId = tableHandle.getConnectorId(); + CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, connectorId); + ConnectorMetadata metadata = catalogMetadata.getMetadata(); + ConnectorTransactionHandle transactionHandle = catalogMetadata.getTransactionHandleFor(connectorId); + ConnectorInsertTableHandle handle = metadata.beginRefreshMaterializedView(session.toConnectorSession(connectorId), tableHandle.getConnectorHandle()); + return new InsertTableHandle(tableHandle.getConnectorId(), transactionHandle, handle); + } + + @Override + public Optional finishRefreshMaterializedView(Session session, InsertTableHandle tableHandle, Collection fragments, Collection computedStatistics) + { + ConnectorId connectorId = tableHandle.getConnectorId(); + ConnectorMetadata metadata = getMetadata(session, connectorId); + return metadata.finishRefreshMaterializedView(session.toConnectorSession(connectorId), tableHandle.getConnectorHandle(), fragments, computedStatistics); + } + @Override public Optional resolveIndex(Session session, TableHandle tableHandle, Set indexableColumns, Set outputColumns, TupleDomain tupleDomain) { diff --git a/presto-main/src/main/java/com/facebook/presto/operator/TableWriterOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/TableWriterOperator.java index 9eb4808b11e9b..1e44f170f3b00 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/TableWriterOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/TableWriterOperator.java @@ -25,6 +25,7 @@ import com.facebook.presto.execution.scheduler.ExecutionWriterTarget; import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.CreateHandle; import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.InsertHandle; +import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.RefreshMaterializedViewHandle; import com.facebook.presto.memory.context.LocalMemoryContext; import com.facebook.presto.metadata.ConnectorMetadataUpdaterManager; import com.facebook.presto.operator.OperationTimer.OperationTiming; @@ -112,7 +113,9 @@ public TableWriterOperatorFactory( this.pageSinkManager = requireNonNull(pageSinkManager, "pageSinkManager is null"); this.metadataUpdaterManager = requireNonNull(metadataUpdaterManager, "metadataUpdaterManager is null"); this.taskMetadataContext = requireNonNull(taskMetadataContext, "taskMetadataContext is null"); - checkArgument(writerTarget instanceof CreateHandle || writerTarget instanceof InsertHandle, "writerTarget must be CreateHandle or InsertHandle"); + checkArgument( + writerTarget instanceof CreateHandle || writerTarget instanceof InsertHandle || writerTarget instanceof RefreshMaterializedViewHandle, + "writerTarget must be CreateHandle or InsertHandle or RefreshMaterializedViewHandle"); this.target = requireNonNull(writerTarget, "writerTarget is null"); this.session = session; this.statisticsAggregationOperatorFactory = requireNonNull(statisticsAggregationOperatorFactory, "statisticsAggregationOperatorFactory is null"); @@ -159,6 +162,9 @@ private ConnectorPageSink createPageSink() if (target instanceof InsertHandle) { return pageSinkManager.createPageSink(session, ((InsertHandle) target).getHandle(), pageSinkContextBuilder.build()); } + if (target instanceof RefreshMaterializedViewHandle) { + return pageSinkManager.createPageSink(session, ((RefreshMaterializedViewHandle) target).getHandle(), pageSinkContextBuilder.build()); + } throw new UnsupportedOperationException("Unhandled target type: " + target.getClass().getName()); } @@ -172,6 +178,10 @@ private static ConnectorId getConnectorId(ExecutionWriterTarget handle) return ((InsertHandle) handle).getHandle().getConnectorId(); } + if (handle instanceof RefreshMaterializedViewHandle) { + return ((RefreshMaterializedViewHandle) handle).getHandle().getConnectorId(); + } + throw new UnsupportedOperationException("Unhandled target type: " + handle.getClass().getName()); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java index adb43e37393ae..f2e709b1f0f9f 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java @@ -143,6 +143,7 @@ public class Analysis private Optional createTableComment = Optional.empty(); private Optional insert = Optional.empty(); + private Optional refreshMaterializedViewAnalysis = Optional.empty(); private Optional analyzeTarget = Optional.empty(); // for describe input and describe output @@ -151,6 +152,9 @@ public class Analysis // for recursive view detection private final Deque tablesForView = new ArrayDeque<>(); + // To prevent recursive analyzing of one materialized view base table + private final ListMultimap, Table> tablesForMaterializedView = ArrayListMultimap.create(); + // for materialized view analysis state detection, state is used to identify if materialized view has been expanded or in-process. private final Map materializedViewAnalysisStateMap = new HashMap<>(); @@ -609,6 +613,16 @@ public Optional getInsert() return insert; } + public void setRefreshMaterializedViewAnalysis(RefreshMaterializedViewAnalysis refreshMaterializedViewAnalysis) + { + this.refreshMaterializedViewAnalysis = Optional.of(refreshMaterializedViewAnalysis); + } + + public Optional getRefreshMaterializedViewAnalysis() + { + return refreshMaterializedViewAnalysis; + } + public Query getNamedQuery(Table table) { return namedQueries.get(NodeRef.of(table)); @@ -663,6 +677,30 @@ public boolean hasTableInView(Table tableReference) return tablesForView.contains(tableReference); } + public void registerTableForMaterializedView(Table view, Table table) + { + requireNonNull(view, "view is null"); + requireNonNull(table, "table is null"); + + tablesForMaterializedView.put(NodeRef.of(view), table); + } + + public void unregisterTableForMaterializedView(Table view, Table table) + { + requireNonNull(view, "view is null"); + requireNonNull(table, "table is null"); + + tablesForMaterializedView.remove(NodeRef.of(view), table); + } + + public boolean hasTableRegisteredForMaterializedView(Table view, Table table) + { + requireNonNull(view, "view is null"); + requireNonNull(table, "table is null"); + + return tablesForMaterializedView.containsEntry(NodeRef.of(view), table); + } + public void setSampleRatio(SampledRelation relation, double ratio) { sampleRatios.put(NodeRef.of(relation), ratio); @@ -783,6 +821,37 @@ public TableHandle getTarget() } } + @Immutable + public static final class RefreshMaterializedViewAnalysis + { + private final TableHandle target; + private final List columns; + private final Query query; + + public RefreshMaterializedViewAnalysis(TableHandle target, List columns, Query query) + { + this.target = requireNonNull(target, "target is null"); + this.columns = requireNonNull(columns, "columns is null"); + this.query = requireNonNull(query, "query is null"); + checkArgument(columns.size() > 0, "No columns given to insert"); + } + + public List getColumns() + { + return columns; + } + + public TableHandle getTarget() + { + return target; + } + + public Query getQuery() + { + return query; + } + } + public static final class JoinUsingAnalysis { private final List leftJoinFields; diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/RefreshMaterializedViewPredicateAnalyzer.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/RefreshMaterializedViewPredicateAnalyzer.java new file mode 100644 index 0000000000000..be0d6dfa3b590 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/RefreshMaterializedViewPredicateAnalyzer.java @@ -0,0 +1,167 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.analyzer; + +import com.facebook.presto.Session; +import com.facebook.presto.common.QualifiedObjectName; +import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.spi.ConnectorMaterializedViewDefinition; +import com.facebook.presto.spi.MaterializedViewNotFoundException; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.sql.tree.ComparisonExpression; +import com.facebook.presto.sql.tree.DefaultTraversalVisitor; +import com.facebook.presto.sql.tree.DereferenceExpression; +import com.facebook.presto.sql.tree.Expression; +import com.facebook.presto.sql.tree.Identifier; +import com.facebook.presto.sql.tree.Literal; +import com.facebook.presto.sql.tree.LogicalBinaryExpression; +import com.facebook.presto.sql.tree.Node; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultimap; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.Optional; + +import static com.facebook.presto.metadata.MetadataUtil.toSchemaTableName; +import static com.facebook.presto.sql.analyzer.SemanticErrorCode.NOT_SUPPORTED; +import static com.facebook.presto.sql.analyzer.SemanticExceptions.missingAttributeException; +import static java.util.Collections.emptyList; +import static java.util.Objects.requireNonNull; + +/** + * Map predicates on view columns in the RefreshMaterializedView where clause to predicates on base table columns, + * which could be used for predicate push-down afterwards. Mapped predicates are connected by AND. + * For view columns that do not have a direct mapping to a base table column, keep the predicate with the view. + */ +public class RefreshMaterializedViewPredicateAnalyzer +{ + private RefreshMaterializedViewPredicateAnalyzer() {} + + public static Map extractTablePredicates( + QualifiedObjectName viewName, + Expression originalPredicate, + Scope viewScope, + Metadata metadata, + Session session) + { + ConnectorMaterializedViewDefinition viewDefinition = metadata.getMaterializedView(session, viewName) + .orElseThrow(() -> new MaterializedViewNotFoundException(toSchemaTableName(viewName))); + + Visitor visitor = new Visitor(viewDefinition, viewScope); + visitor.process(originalPredicate); + + return visitor.getTablePredicates(); + } + + /** + * Return a table to predicates map. Map key is materialized view name or base table name. + */ + private static class Visitor + extends DefaultTraversalVisitor + { + private final ImmutableMultimap.Builder tablePredicatesBuilder = ImmutableMultimap.builder(); + + private final ConnectorMaterializedViewDefinition viewDefinition; + private final Scope viewScope; + + private Visitor( + ConnectorMaterializedViewDefinition viewDefinition, + Scope viewScope) + { + this.viewDefinition = requireNonNull(viewDefinition, "viewDefinition is null"); + this.viewScope = requireNonNull(viewScope, "viewScope is null"); + } + + public Map getTablePredicates() + { + ImmutableMap.Builder tableConjuncts = ImmutableMap.builder(); + + tablePredicatesBuilder.build().asMap().forEach((table, predicateCollection) -> { + Optional conjunctOptional = predicateCollection.stream() + .reduce((left, right) -> new LogicalBinaryExpression(LogicalBinaryExpression.Operator.AND, left, right)); + + conjunctOptional.ifPresent(conjunct -> tableConjuncts.put(table, conjunct)); + }); + + return tableConjuncts.build(); + } + + @Override + public Void process(Node node, @Nullable Void context) + { + if (!(node instanceof ComparisonExpression || node instanceof LogicalBinaryExpression)) { + throw new SemanticException(NOT_SUPPORTED, node, "Only column specifications connected by logical AND are supported in WHERE clause."); + } + + return super.process(node, null); + } + + @Override + protected Void visitExpression(Expression node, Void context) + { + throw new SemanticException(NOT_SUPPORTED, node, "Only column specifications connected by logical AND are supported in WHERE clause."); + } + + @Override + protected Void visitLogicalBinaryExpression(LogicalBinaryExpression node, Void context) + { + if (!LogicalBinaryExpression.Operator.AND.equals(node.getOperator())) { + throw new SemanticException(NOT_SUPPORTED, node, "Only logical AND is supported in WHERE clause."); + } + if (!(node.getLeft() instanceof ComparisonExpression || node.getLeft() instanceof LogicalBinaryExpression)) { + throw new SemanticException(NOT_SUPPORTED, node.getLeft(), "Only column specifications connected by logical AND are supported in WHERE clause."); + } + if (!(node.getRight() instanceof ComparisonExpression || node.getRight() instanceof LogicalBinaryExpression)) { + throw new SemanticException(NOT_SUPPORTED, node.getRight(), "Only column specifications connected by logical AND are supported in WHERE clause."); + } + + return super.visitLogicalBinaryExpression(node, null); + } + + @Override + protected Void visitComparisonExpression(ComparisonExpression node, Void context) + { + if (!(node.getLeft() instanceof Identifier || node.getLeft() instanceof DereferenceExpression)) { + throw new SemanticException(NOT_SUPPORTED, node.getLeft(), "Only columns specified on literals are supported in WHERE clause."); + } + if (!(node.getRight() instanceof Literal)) { + throw new SemanticException(NOT_SUPPORTED, node.getRight(), "Only columns specified on literals are supported in WHERE clause."); + } + + ResolvedField resolvedField = viewScope.tryResolveField(node.getLeft()).orElseThrow(() -> missingAttributeException(node.getLeft())); + String column = resolvedField.getField().getOriginColumnName().orElseThrow(() -> missingAttributeException(node.getLeft())); + + if (!viewDefinition.getValidRefreshColumns().orElse(emptyList()).contains(column)) { + throw new SemanticException(NOT_SUPPORTED, node.getLeft(), "Refresh materialized view by column %s is not supported.", node.getLeft().toString()); + } + + Map baseTableColumns = viewDefinition.getColumnMappingsAsMap().get(column); + if (baseTableColumns != null) { + for (SchemaTableName baseTable : baseTableColumns.keySet()) { + tablePredicatesBuilder.put( + baseTable, + new ComparisonExpression(node.getOperator(), new Identifier(baseTableColumns.get(baseTable)), node.getRight())); + } + } + else { + SchemaTableName viewName = new SchemaTableName(viewDefinition.getSchema(), viewDefinition.getTable()); + tablePredicatesBuilder.put(viewName, node); + } + + return null; + } + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java index 5bb8d8c544bab..34fde52d44c38 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java @@ -118,6 +118,7 @@ import com.facebook.presto.sql.tree.Query; import com.facebook.presto.sql.tree.QueryBody; import com.facebook.presto.sql.tree.QuerySpecification; +import com.facebook.presto.sql.tree.RefreshMaterializedView; import com.facebook.presto.sql.tree.Relation; import com.facebook.presto.sql.tree.RenameColumn; import com.facebook.presto.sql.tree.RenameSchema; @@ -180,6 +181,7 @@ import static com.facebook.presto.common.type.UnknownType.UNKNOWN; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.metadata.MetadataUtil.createQualifiedObjectName; +import static com.facebook.presto.metadata.MetadataUtil.toSchemaTableName; import static com.facebook.presto.spi.MaterializedViewStatus.MaterializedDataPredicates; import static com.facebook.presto.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT; import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND; @@ -191,6 +193,7 @@ import static com.facebook.presto.sql.NodeUtils.mapFromProperties; import static com.facebook.presto.sql.ParsingUtil.createParsingOptions; import static com.facebook.presto.sql.QueryUtil.selectList; +import static com.facebook.presto.sql.QueryUtil.simpleQuery; import static com.facebook.presto.sql.analyzer.AggregationAnalyzer.verifyOrderByAggregations; import static com.facebook.presto.sql.analyzer.AggregationAnalyzer.verifySourceAggregations; import static com.facebook.presto.sql.analyzer.Analysis.MaterializedViewAnalysisState; @@ -203,6 +206,7 @@ import static com.facebook.presto.sql.analyzer.ExpressionTreeUtils.extractWindowFunctions; import static com.facebook.presto.sql.analyzer.MaterializedViewPlanValidator.MaterializedViewPlanValidatorContext; import static com.facebook.presto.sql.analyzer.PredicateStitcher.PredicateStitcherContext; +import static com.facebook.presto.sql.analyzer.RefreshMaterializedViewPredicateAnalyzer.extractTablePredicates; import static com.facebook.presto.sql.analyzer.ScopeReferenceExtractor.hasReferencesToScope; import static com.facebook.presto.sql.analyzer.SemanticErrorCode.AMBIGUOUS_ATTRIBUTE; import static com.facebook.presto.sql.analyzer.SemanticErrorCode.COLUMN_NAME_NOT_SPECIFIED; @@ -222,6 +226,7 @@ import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_ATTRIBUTE; import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_CATALOG; import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_COLUMN; +import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_MATERIALIZED_VIEW; import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_SCHEMA; import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_TABLE; import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MUST_BE_WINDOW_FUNCTION; @@ -700,6 +705,78 @@ private void validateMaterialziedViewQueryPlan(Statement query) validator.process(query, new MaterializedViewPlanValidatorContext()); } + @Override + protected Scope visitRefreshMaterializedView(RefreshMaterializedView node, Optional scope) + { + analysis.setUpdateType("INSERT"); + + QualifiedObjectName viewName = createQualifiedObjectName(session, node.getTarget(), node.getTarget().getName()); + + ConnectorMaterializedViewDefinition view = metadata.getMaterializedView(session, viewName) + .orElseThrow(() -> new SemanticException(MISSING_MATERIALIZED_VIEW, node, "Materialized view '%s' does not exist", viewName)); + + accessControl.checkCanInsertIntoTable(session.getRequiredTransactionId(), session.getIdentity(), session.getAccessControlContext(), viewName); + + Scope viewScope = process(node.getTarget(), scope); + Map tablePredicates = extractTablePredicates(viewName, node.getWhere(), viewScope, metadata, session); + + Query viewQuery = parseView(view.getOriginalSql(), viewName, node); + Query refreshQuery = tablePredicates.containsKey(toSchemaTableName(viewName)) ? + buildQueryWithPredicate(viewQuery, tablePredicates.get(toSchemaTableName(viewName))) + : viewQuery; + process(refreshQuery, scope); + + TableHandle tableHandle = metadata.getTableHandle(session, viewName) + .orElseThrow(() -> new SemanticException(MISSING_MATERIALIZED_VIEW, node, "Materialized view '%s' does not exist", viewName)); + Map columnHandles = metadata.getColumnHandles(session, tableHandle); + List targetColumnHandles = metadata.getTableMetadata(session, tableHandle).getColumns().stream() + .filter(column -> !column.isHidden()) + .map(column -> columnHandles.get(column.getName())) + .collect(toImmutableList()); + analysis.setRefreshMaterializedViewAnalysis(new Analysis.RefreshMaterializedViewAnalysis( + tableHandle, + targetColumnHandles, + refreshQuery)); + + return createAndAssignScope(node, scope, Field.newUnqualified("rows", BIGINT)); + } + + private Optional analyzeBaseTableForRefreshMaterializedView(Table baseTable, Optional scope) + { + checkState(analysis.getStatement() instanceof RefreshMaterializedView, "Not analyzing RefreshMaterializedView statement"); + + RefreshMaterializedView refreshMaterializedView = (RefreshMaterializedView) analysis.getStatement(); + QualifiedObjectName viewName = createQualifiedObjectName(session, refreshMaterializedView.getTarget(), refreshMaterializedView.getTarget().getName()); + + Scope viewScope = process(refreshMaterializedView.getTarget(), scope); + Map tablePredicates = extractTablePredicates(viewName, refreshMaterializedView.getWhere(), viewScope, metadata, session); + + SchemaTableName baseTableName = toSchemaTableName(createQualifiedObjectName(session, baseTable, baseTable.getName())); + if (tablePredicates.containsKey(baseTableName)) { + Query tableSubquery = buildQueryWithPredicate(baseTable, tablePredicates.get(baseTableName)); + analysis.registerNamedQuery(baseTable, tableSubquery); + + Scope subqueryScope = process(tableSubquery, scope); + + return Optional.of(subqueryScope.getRelationType().withAlias(baseTableName.getTableName(), null)); + } + + return Optional.empty(); + } + + private Query buildQueryWithPredicate(Table table, Expression predicate) + { + Query query = simpleQuery(selectList(new AllColumns()), table, predicate); + return (Query) sqlParser.createStatement( + SqlFormatterUtil.getFormattedSql(query, sqlParser, Optional.empty()), + createParsingOptions(session, warningCollector)); + } + + private Query buildQueryWithPredicate(Query originalQuery, Expression predicate) + { + return simpleQuery(selectList(new AllColumns()), new TableSubquery(originalQuery), predicate); + } + @Override protected Scope visitCreateFunction(CreateFunction node, Optional scope) { @@ -1173,6 +1250,20 @@ protected Scope visitTable(Table table, Optional scope) } analysis.registerTable(table, tableHandle.get()); + + if (statement instanceof RefreshMaterializedView) { + Table view = ((RefreshMaterializedView) statement).getTarget(); + if (!table.equals(view) && !analysis.hasTableRegisteredForMaterializedView(view, table)) { + analysis.registerTableForMaterializedView(view, table); + Optional descriptor = analyzeBaseTableForRefreshMaterializedView(table, scope); + analysis.unregisterTableForMaterializedView(view, table); + + if (descriptor.isPresent()) { + return createAndAssignScope(table, scope, descriptor.get()); + } + } + } + return createAndAssignScope(table, scope, fields.build()); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java index ecbff4b63b318..eaddaf878411e 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java @@ -37,6 +37,7 @@ import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.CreateHandle; import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.DeleteHandle; import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.InsertHandle; +import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.RefreshMaterializedViewHandle; import com.facebook.presto.execution.scheduler.TableWriteInfo; import com.facebook.presto.execution.scheduler.TableWriteInfo.DeleteScanInfo; import com.facebook.presto.expressions.DynamicFilters; @@ -3175,6 +3176,9 @@ else if (target instanceof DeleteHandle) { metadata.finishDelete(session, ((DeleteHandle) target).getHandle(), fragments); return Optional.empty(); } + else if (target instanceof RefreshMaterializedViewHandle) { + return metadata.finishRefreshMaterializedView(session, ((RefreshMaterializedViewHandle) target).getHandle(), fragments, statistics); + } else { throw new AssertionError("Unhandled target type: " + target.getClass().getName()); } @@ -3190,6 +3194,9 @@ private static PageSinkCommitter createPageSinkCommitter(Session session, Metada else if (target instanceof InsertHandle) { return metadata.commitPageSinkAsync(session, ((InsertHandle) target).getHandle(), fragments); } + else if (target instanceof RefreshMaterializedViewHandle) { + return metadata.commitPageSinkAsync(session, ((RefreshMaterializedViewHandle) target).getHandle(), fragments); + } else { throw new AssertionError("Unhandled target type: " + target.getClass().getName()); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java index 5678fef0987ec..62f77d27de67d 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java @@ -76,6 +76,7 @@ import com.facebook.presto.sql.tree.NodeRef; import com.facebook.presto.sql.tree.NullLiteral; import com.facebook.presto.sql.tree.Query; +import com.facebook.presto.sql.tree.RefreshMaterializedView; import com.facebook.presto.sql.tree.Statement; import com.facebook.presto.sql.tree.SymbolReference; import com.google.common.collect.ImmutableList; @@ -102,6 +103,7 @@ import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT; import static com.facebook.presto.sql.planner.plan.TableWriterNode.CreateName; import static com.facebook.presto.sql.planner.plan.TableWriterNode.InsertReference; +import static com.facebook.presto.sql.planner.plan.TableWriterNode.RefreshMaterializedViewReference; import static com.facebook.presto.sql.planner.plan.TableWriterNode.WriterTarget; import static com.facebook.presto.sql.relational.Expressions.constant; import static com.facebook.presto.sql.relational.OriginalExpressionUtils.castToRowExpression; @@ -251,6 +253,10 @@ else if (statement instanceof Query) { else if (statement instanceof Explain && ((Explain) statement).isAnalyze()) { return createExplainAnalyzePlan(analysis, (Explain) statement); } + else if (statement instanceof RefreshMaterializedView) { + checkState(analysis.getRefreshMaterializedViewAnalysis().isPresent(), "RefreshMaterializedView analysis is missing"); + return createRefreshMaterializedViewPlan(analysis, (RefreshMaterializedView) statement); + } else { throw new PrestoException(NOT_SUPPORTED, "Unsupported statement type " + statement.getClass().getSimpleName()); } @@ -343,11 +349,36 @@ private RelationPlan createTableCreationPlan(Analysis analysis, Query query) statisticsMetadata); } + private RelationPlan createRefreshMaterializedViewPlan(Analysis analysis, RefreshMaterializedView refreshMaterializedViewStatement) + { + Analysis.RefreshMaterializedViewAnalysis viewAnalysis = analysis.getRefreshMaterializedViewAnalysis().get(); + + TableHandle tableHandle = viewAnalysis.getTarget(); + List columnHandles = viewAnalysis.getColumns(); + WriterTarget target = new RefreshMaterializedViewReference(tableHandle, metadata.getTableMetadata(session, tableHandle).getTable()); + + return buildInternalInsertPlan(tableHandle, columnHandles, viewAnalysis.getQuery(), analysis, target); + } + private RelationPlan createInsertPlan(Analysis analysis, Insert insertStatement) { - Analysis.Insert insert = analysis.getInsert().get(); + Analysis.Insert insertAnalysis = analysis.getInsert().get(); - TableMetadata tableMetadata = metadata.getTableMetadata(session, insert.getTarget()); + TableHandle tableHandle = insertAnalysis.getTarget(); + List columnHandles = insertAnalysis.getColumns(); + WriterTarget target = new InsertReference(tableHandle, metadata.getTableMetadata(session, tableHandle).getTable()); + + return buildInternalInsertPlan(tableHandle, columnHandles, insertStatement.getQuery(), analysis, target); + } + + private RelationPlan buildInternalInsertPlan( + TableHandle tableHandle, + List columnHandles, + Query query, + Analysis analysis, + WriterTarget target) + { + TableMetadata tableMetadata = metadata.getTableMetadata(session, tableHandle); List visibleTableColumns = tableMetadata.getColumns().stream() .filter(column -> !column.isHidden()) @@ -356,16 +387,16 @@ private RelationPlan createInsertPlan(Analysis analysis, Insert insertStatement) .map(ColumnMetadata::getName) .collect(toImmutableList()); - RelationPlan plan = createRelationPlan(analysis, insertStatement.getQuery()); + RelationPlan plan = createRelationPlan(analysis, query); - Map columns = metadata.getColumnHandles(session, insert.getTarget()); + Map columns = metadata.getColumnHandles(session, tableHandle); Assignments.Builder assignments = Assignments.builder(); for (ColumnMetadata column : tableMetadata.getColumns()) { if (column.isHidden()) { continue; } VariableReferenceExpression output = variableAllocator.newVariable(column.getName(), column.getType()); - int index = insert.getColumns().indexOf(columns.get(column.getName())); + int index = columnHandles.indexOf(columns.get(column.getName())); if (index < 0) { Expression cast = new Cast(new NullLiteral(), column.getType().getTypeSignature().toString()); assignments.put(output, castToRowExpression(cast)); @@ -393,16 +424,16 @@ private RelationPlan createInsertPlan(Analysis analysis, Insert insertStatement) plan = new RelationPlan(projectNode, scope, projectNode.getOutputVariables()); - Optional newTableLayout = metadata.getInsertLayout(session, insert.getTarget()); - Optional preferredShuffleLayout = metadata.getPreferredShuffleLayoutForInsert(session, insert.getTarget()); + Optional newTableLayout = metadata.getInsertLayout(session, tableHandle); + Optional preferredShuffleLayout = metadata.getPreferredShuffleLayoutForInsert(session, tableHandle); - String catalogName = insert.getTarget().getConnectorId().getCatalogName(); + String catalogName = tableHandle.getConnectorId().getCatalogName(); TableStatisticsMetadata statisticsMetadata = metadata.getStatisticsCollectionMetadataForWrite(session, catalogName, tableMetadata.getMetadata()); return createTableWriterPlan( analysis, plan, - new InsertReference(insert.getTarget(), metadata.getTableMetadata(session, insert.getTarget()).getTable()), + target, visibleTableColumnNames, visibleTableColumns, newTableLayout, diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java index aa8427e3acbec..55cc87c6aa1be 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java @@ -71,6 +71,7 @@ import com.facebook.presto.sql.planner.plan.TableWriterNode; import com.facebook.presto.sql.planner.plan.TableWriterNode.CreateName; import com.facebook.presto.sql.planner.plan.TableWriterNode.InsertReference; +import com.facebook.presto.sql.planner.plan.TableWriterNode.RefreshMaterializedViewReference; import com.facebook.presto.sql.planner.plan.TableWriterNode.WriterTarget; import com.facebook.presto.sql.planner.plan.TopNRowNumberNode; import com.facebook.presto.sql.planner.plan.WindowNode; @@ -1131,7 +1132,7 @@ public GroupedExecutionProperties visitTableWriter(TableWriterNode node, Void co GroupedExecutionProperties properties = node.getSource().accept(this, null); boolean recoveryEligible = properties.isRecoveryEligible(); WriterTarget target = node.getTarget().orElseThrow(() -> new VerifyException("target is absent")); - if (target instanceof CreateName || target instanceof InsertReference) { + if (target instanceof CreateName || target instanceof InsertReference || target instanceof RefreshMaterializedViewReference) { recoveryEligible &= metadata.getConnectorCapabilities(session, target.getConnectorId()).contains(SUPPORTS_PAGE_SINK_COMMIT); } else { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/TableWriterNode.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/TableWriterNode.java index ba78b91c44c49..b1634df3fe15d 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/TableWriterNode.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/TableWriterNode.java @@ -332,4 +332,40 @@ public String toString() return handle.toString(); } } + + public static class RefreshMaterializedViewReference + extends WriterTarget + { + private final TableHandle handle; + private final SchemaTableName schemaTableName; + + public RefreshMaterializedViewReference(TableHandle handle, SchemaTableName schemaTableName) + { + this.handle = requireNonNull(handle, "handle is null"); + this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); + } + + public TableHandle getHandle() + { + return handle; + } + + @Override + public ConnectorId getConnectorId() + { + return handle.getConnectorId(); + } + + @Override + public SchemaTableName getSchemaTableName() + { + return schemaTableName; + } + + @Override + public String toString() + { + return handle.toString(); + } + } } diff --git a/presto-main/src/main/java/com/facebook/presto/util/StatementUtils.java b/presto-main/src/main/java/com/facebook/presto/util/StatementUtils.java index 199d3bd3a8be2..841c2ea61d904 100644 --- a/presto-main/src/main/java/com/facebook/presto/util/StatementUtils.java +++ b/presto-main/src/main/java/com/facebook/presto/util/StatementUtils.java @@ -43,6 +43,7 @@ import com.facebook.presto.sql.tree.Insert; import com.facebook.presto.sql.tree.Prepare; import com.facebook.presto.sql.tree.Query; +import com.facebook.presto.sql.tree.RefreshMaterializedView; import com.facebook.presto.sql.tree.RenameColumn; import com.facebook.presto.sql.tree.RenameSchema; import com.facebook.presto.sql.tree.RenameTable; @@ -87,6 +88,7 @@ private StatementUtils() {} builder.put(CreateTableAsSelect.class, QueryType.INSERT); builder.put(Insert.class, QueryType.INSERT); + builder.put(RefreshMaterializedView.class, QueryType.INSERT); builder.put(Delete.class, QueryType.DELETE); diff --git a/presto-main/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java b/presto-main/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java index 7842a987a0f8a..3f098a3c95a65 100644 --- a/presto-main/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java +++ b/presto-main/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java @@ -438,6 +438,18 @@ public MaterializedViewStatus getMaterializedViewStatus(Session session, Qualifi throw new UnsupportedOperationException(); } + @Override + public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle) + { + throw new UnsupportedOperationException(); + } + + @Override + public Optional finishRefreshMaterializedView(Session session, InsertTableHandle tableHandle, Collection fragments, Collection computedStatistics) + { + throw new UnsupportedOperationException(); + } + @Override public Optional resolveIndex(Session session, TableHandle tableHandle, Set indexableColumns, Set outputColumns, TupleDomain tupleDomain) { diff --git a/presto-parser/src/main/antlr4/com/facebook/presto/sql/parser/SqlBase.g4 b/presto-parser/src/main/antlr4/com/facebook/presto/sql/parser/SqlBase.g4 index d642a7f901ffd..a65ae6560791d 100644 --- a/presto-parser/src/main/antlr4/com/facebook/presto/sql/parser/SqlBase.g4 +++ b/presto-parser/src/main/antlr4/com/facebook/presto/sql/parser/SqlBase.g4 @@ -65,6 +65,7 @@ statement (COMMENT string)? (WITH properties)? AS (query | '('query')') #createMaterializedView | DROP MATERIALIZED VIEW (IF EXISTS)? qualifiedName #dropMaterializedView + | REFRESH MATERIALIZED VIEW qualifiedName WHERE booleanExpression #refreshMaterializedView | CREATE (OR REPLACE)? TEMPORARY? FUNCTION functionName=qualifiedName '(' (sqlParameterDeclaration (',' sqlParameterDeclaration)*)? ')' RETURNS returnType=type @@ -568,7 +569,7 @@ nonReserved | NAME | NFC | NFD | NFKC | NFKD | NO | NONE | NULLIF | NULLS | ONLY | OPTION | ORDINALITY | OUTPUT | OVER | PARTITION | PARTITIONS | POSITION | PRECEDING | PRIVILEGES | PROPERTIES - | RANGE | READ | RENAME | REPEATABLE | REPLACE | RESET | RESPECT | RESTRICT | RETURN | RETURNS | REVOKE | ROLE | ROLES | ROLLBACK | ROW | ROWS + | RANGE | READ | REFRESH | RENAME | REPEATABLE | REPLACE | RESET | RESPECT | RESTRICT | RETURN | RETURNS | REVOKE | ROLE | ROLES | ROLLBACK | ROW | ROWS | SCHEMA | SCHEMAS | SECOND | SECURITY | SERIALIZABLE | SESSION | SET | SETS | SQL | SHOW | SOME | START | STATS | SUBSTRING | SYSTEM | TABLES | TABLESAMPLE | TEMPORARY | TEXT | TIME | TIMESTAMP | TO | TRANSACTION | TRY_CAST | TYPE @@ -716,6 +717,7 @@ PROPERTIES: 'PROPERTIES'; RANGE: 'RANGE'; READ: 'READ'; RECURSIVE: 'RECURSIVE'; +REFRESH: 'REFRESH'; RENAME: 'RENAME'; REPEATABLE: 'REPEATABLE'; REPLACE: 'REPLACE'; diff --git a/presto-parser/src/main/java/com/facebook/presto/sql/SqlFormatter.java b/presto-parser/src/main/java/com/facebook/presto/sql/SqlFormatter.java index 6ededfc66b726..79040c15ae828 100644 --- a/presto-parser/src/main/java/com/facebook/presto/sql/SqlFormatter.java +++ b/presto-parser/src/main/java/com/facebook/presto/sql/SqlFormatter.java @@ -72,6 +72,7 @@ import com.facebook.presto.sql.tree.QualifiedName; import com.facebook.presto.sql.tree.Query; import com.facebook.presto.sql.tree.QuerySpecification; +import com.facebook.presto.sql.tree.RefreshMaterializedView; import com.facebook.presto.sql.tree.Relation; import com.facebook.presto.sql.tree.RenameColumn; import com.facebook.presto.sql.tree.RenameSchema; @@ -682,6 +683,17 @@ protected Void visitDropMaterializedView(DropMaterializedView node, Integer cont return null; } + @Override + protected Void visitRefreshMaterializedView(RefreshMaterializedView node, Integer context) + { + builder.append("REFRESH MATERIALIZED VIEW ") + .append(formatName(node.getTarget().getName())) + .append(" WHERE ") + .append(formatExpression(node.getWhere(), parameters)); + + return null; + } + @Override protected Void visitExplain(Explain node, Integer indent) { diff --git a/presto-parser/src/main/java/com/facebook/presto/sql/parser/AstBuilder.java b/presto-parser/src/main/java/com/facebook/presto/sql/parser/AstBuilder.java index 08a79b162e8a9..5f0b3e0733616 100644 --- a/presto-parser/src/main/java/com/facebook/presto/sql/parser/AstBuilder.java +++ b/presto-parser/src/main/java/com/facebook/presto/sql/parser/AstBuilder.java @@ -116,6 +116,7 @@ import com.facebook.presto.sql.tree.Query; import com.facebook.presto.sql.tree.QueryBody; import com.facebook.presto.sql.tree.QuerySpecification; +import com.facebook.presto.sql.tree.RefreshMaterializedView; import com.facebook.presto.sql.tree.Relation; import com.facebook.presto.sql.tree.RenameColumn; import com.facebook.presto.sql.tree.RenameSchema; @@ -357,6 +358,15 @@ public Node visitDropMaterializedView(SqlBaseParser.DropMaterializedViewContext return new DropMaterializedView(Optional.of(getLocation(context)), getQualifiedName(context.qualifiedName()), context.EXISTS() != null); } + @Override + public Node visitRefreshMaterializedView(SqlBaseParser.RefreshMaterializedViewContext context) + { + return new RefreshMaterializedView( + getLocation(context), + new Table(getLocation(context), getQualifiedName(context.qualifiedName())), + (Expression) visit(context.booleanExpression())); + } + @Override public Node visitInsertInto(SqlBaseParser.InsertIntoContext context) { diff --git a/presto-parser/src/main/java/com/facebook/presto/sql/tree/AstVisitor.java b/presto-parser/src/main/java/com/facebook/presto/sql/tree/AstVisitor.java index af9564137b7d0..742c55baaa914 100644 --- a/presto-parser/src/main/java/com/facebook/presto/sql/tree/AstVisitor.java +++ b/presto-parser/src/main/java/com/facebook/presto/sql/tree/AstVisitor.java @@ -602,6 +602,11 @@ protected R visitDropMaterializedView(DropMaterializedView node, C context) return visitStatement(node, context); } + protected R visitRefreshMaterializedView(RefreshMaterializedView node, C context) + { + return visitStatement(node, context); + } + protected R visitCreateFunction(CreateFunction node, C context) { return visitStatement(node, context); diff --git a/presto-parser/src/main/java/com/facebook/presto/sql/tree/DefaultTraversalVisitor.java b/presto-parser/src/main/java/com/facebook/presto/sql/tree/DefaultTraversalVisitor.java index 2200a39f10e30..1c84b36d3a146 100644 --- a/presto-parser/src/main/java/com/facebook/presto/sql/tree/DefaultTraversalVisitor.java +++ b/presto-parser/src/main/java/com/facebook/presto/sql/tree/DefaultTraversalVisitor.java @@ -572,6 +572,15 @@ protected R visitCreateMaterializedView(CreateMaterializedView node, C context) return null; } + @Override + protected R visitRefreshMaterializedView(RefreshMaterializedView node, C context) + { + process(node.getTarget(), context); + process(node.getWhere(), context); + + return null; + } + @Override protected R visitSetSession(SetSession node, C context) { diff --git a/presto-parser/src/main/java/com/facebook/presto/sql/tree/RefreshMaterializedView.java b/presto-parser/src/main/java/com/facebook/presto/sql/tree/RefreshMaterializedView.java new file mode 100644 index 0000000000000..fbc4988ea7024 --- /dev/null +++ b/presto-parser/src/main/java/com/facebook/presto/sql/tree/RefreshMaterializedView.java @@ -0,0 +1,98 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.tree; + +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +public class RefreshMaterializedView + extends Statement +{ + private final Table target; + private final Expression where; + + public RefreshMaterializedView(Table target, Expression where) + { + this(Optional.empty(), target, where); + } + + public RefreshMaterializedView(NodeLocation location, Table target, Expression where) + { + this(Optional.of(location), target, where); + } + + private RefreshMaterializedView(Optional location, Table target, Expression where) + { + super(location); + this.target = requireNonNull(target, "target is null"); + this.where = requireNonNull(where, "where is null"); + } + + public Table getTarget() + { + return target; + } + + public Expression getWhere() + { + return where; + } + + @Override + public R accept(AstVisitor visitor, C context) + { + return visitor.visitRefreshMaterializedView(this, context); + } + + @Override + public List getChildren() + { + return ImmutableList.of(where); + } + + @Override + public int hashCode() + { + return Objects.hash(where); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + RefreshMaterializedView o = (RefreshMaterializedView) obj; + return Objects.equals(target, o.target) && + Objects.equals(where, o.where); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("name", target) + .add("where", where) + .toString(); + } +} diff --git a/presto-parser/src/test/java/com/facebook/presto/sql/parser/TestSqlParser.java b/presto-parser/src/test/java/com/facebook/presto/sql/parser/TestSqlParser.java index 7ffef1dec2620..9346580de1707 100644 --- a/presto-parser/src/test/java/com/facebook/presto/sql/parser/TestSqlParser.java +++ b/presto-parser/src/test/java/com/facebook/presto/sql/parser/TestSqlParser.java @@ -100,6 +100,7 @@ import com.facebook.presto.sql.tree.QuantifiedComparisonExpression; import com.facebook.presto.sql.tree.Query; import com.facebook.presto.sql.tree.QuerySpecification; +import com.facebook.presto.sql.tree.RefreshMaterializedView; import com.facebook.presto.sql.tree.RenameColumn; import com.facebook.presto.sql.tree.RenameSchema; import com.facebook.presto.sql.tree.RenameTable; @@ -1377,6 +1378,21 @@ public void testDropMaterializedView() assertStatement("DROP MATERIALIZED VIEW IF EXISTS a.b.c", new DropMaterializedView(Optional.empty(), QualifiedName.of("a", "b", "c"), true)); } + @Test + public void testRefreshMaterializedView() + { + assertStatement( + "REFRESH MATERIALIZED VIEW a WHERE p = 'x'", + new RefreshMaterializedView( + table(QualifiedName.of("a")), + new ComparisonExpression(ComparisonExpression.Operator.EQUAL, new Identifier("p"), new StringLiteral("x")))); + assertStatement( + "REFRESH MATERIALIZED VIEW a.b WHERE p = 'x'", + new RefreshMaterializedView( + table(QualifiedName.of("a", "b")), + new ComparisonExpression(ComparisonExpression.Operator.EQUAL, new Identifier("p"), new StringLiteral("x")))); + } + @Test public void testDropFunction() { diff --git a/presto-parser/src/test/java/com/facebook/presto/sql/parser/TestSqlParserErrorHandling.java b/presto-parser/src/test/java/com/facebook/presto/sql/parser/TestSqlParserErrorHandling.java index c07a77a48e57c..725f8d17b20df 100644 --- a/presto-parser/src/test/java/com/facebook/presto/sql/parser/TestSqlParserErrorHandling.java +++ b/presto-parser/src/test/java/com/facebook/presto/sql/parser/TestSqlParserErrorHandling.java @@ -40,10 +40,10 @@ public Object[][] getStatements() return new Object[][] { {"", "line 1:1: mismatched input ''. Expecting: 'ALTER', 'ANALYZE', 'CALL', 'COMMIT', 'CREATE', 'DEALLOCATE', 'DELETE', 'DESC', 'DESCRIBE', 'DROP', 'EXECUTE', 'EXPLAIN', 'GRANT', " + - "'INSERT', 'PREPARE', 'RESET', 'REVOKE', 'ROLLBACK', 'SET', 'SHOW', 'START', 'USE', "}, + "'INSERT', 'PREPARE', 'REFRESH', 'RESET', 'REVOKE', 'ROLLBACK', 'SET', 'SHOW', 'START', 'USE', "}, {"@select", "line 1:1: mismatched input '@'. Expecting: 'ALTER', 'ANALYZE', 'CALL', 'COMMIT', 'CREATE', 'DEALLOCATE', 'DELETE', 'DESC', 'DESCRIBE', 'DROP', 'EXECUTE', 'EXPLAIN', 'GRANT', " + - "'INSERT', 'PREPARE', 'RESET', 'REVOKE', 'ROLLBACK', 'SET', 'SHOW', 'START', 'USE', "}, + "'INSERT', 'PREPARE', 'REFRESH', 'RESET', 'REVOKE', 'ROLLBACK', 'SET', 'SHOW', 'START', 'USE', "}, {"select * from foo where @what", "line 1:25: mismatched input '@'. Expecting: "}, {"select * from 'oops", diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkQueryExecutionFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkQueryExecutionFactory.java index 305a724ddfb0b..aba04b5532260 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkQueryExecutionFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkQueryExecutionFactory.java @@ -571,6 +571,9 @@ else if (writerTarget instanceof ExecutionWriterTarget.CreateHandle) { else if (writerTarget instanceof ExecutionWriterTarget.InsertHandle) { connectorId = ((ExecutionWriterTarget.InsertHandle) writerTarget).getHandle().getConnectorId(); } + else if (writerTarget instanceof ExecutionWriterTarget.RefreshMaterializedViewHandle) { + connectorId = ((ExecutionWriterTarget.RefreshMaterializedViewHandle) writerTarget).getHandle().getConnectorId(); + } else { throw new IllegalArgumentException("unexpected writer target type: " + writerTarget.getClass()); } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorMaterializedViewDefinition.java b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorMaterializedViewDefinition.java index adcef0b72ede8..e2204ccfc1f52 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorMaterializedViewDefinition.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorMaterializedViewDefinition.java @@ -35,6 +35,7 @@ public final class ConnectorMaterializedViewDefinition private final List baseTables; private final Optional owner; private final List columnMappings; + private final Optional> validRefreshColumns; @JsonCreator public ConnectorMaterializedViewDefinition( @@ -43,7 +44,8 @@ public ConnectorMaterializedViewDefinition( @JsonProperty("table") String table, @JsonProperty("baseTables") List baseTables, @JsonProperty("owner") Optional owner, - @JsonProperty("columnMapping") List columnMappings) + @JsonProperty("columnMapping") List columnMappings, + @JsonProperty("validRefreshColumns") Optional> validRefreshColumns) { this.originalSql = requireNonNull(originalSql, "originalSql is null"); this.schema = requireNonNull(schema, "schema is null"); @@ -51,6 +53,7 @@ public ConnectorMaterializedViewDefinition( this.baseTables = unmodifiableList(new ArrayList<>(requireNonNull(baseTables, "baseTables is null"))); this.owner = requireNonNull(owner, "owner is null"); this.columnMappings = unmodifiableList(new ArrayList<>(requireNonNull(columnMappings, "columnMappings is null"))); + this.validRefreshColumns = requireNonNull(validRefreshColumns, "validRefreshColumns is null").map(columns -> unmodifiableList(new ArrayList<>(columns))); } @JsonIgnore @@ -60,9 +63,17 @@ public ConnectorMaterializedViewDefinition( String table, List baseTables, Optional owner, - Map> originalColumnMapping) + Map> originalColumnMapping, + Optional> validRefreshColumns) { - this(originalSql, schema, table, baseTables, owner, convertFromMapToColumnMappings(requireNonNull(originalColumnMapping, "originalColumnMapping is null"), new SchemaTableName(schema, table))); + this( + originalSql, + schema, + table, + baseTables, + owner, + convertFromMapToColumnMappings(requireNonNull(originalColumnMapping, "originalColumnMapping is null"), new SchemaTableName(schema, table)), + validRefreshColumns); } @JsonProperty @@ -101,6 +112,12 @@ public List getColumnMappings() return columnMappings; } + @JsonProperty + public Optional> getValidRefreshColumns() + { + return validRefreshColumns; + } + @Override public String toString() { @@ -111,6 +128,7 @@ public String toString() sb.append(",baseTables=").append(baseTables); sb.append(",owner=").append(owner.orElse(null)); sb.append(",columnMappings=").append(columnMappings); + sb.append(",validRefreshColumns=").append(validRefreshColumns.orElse(null)); sb.append("}"); return sb.toString(); } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java index 63894836f68f4..d3d4c9c23c578 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java @@ -594,6 +594,22 @@ default MaterializedViewStatus getMaterializedViewStatus(ConnectorSession sessio throw new PrestoException(NOT_SUPPORTED, "This connector does not support getting materialized views status"); } + /** + * Begin refresh materialized view + */ + default ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle) + { + throw new PrestoException(NOT_SUPPORTED, "This connector does not support refresh materialized views"); + } + + /** + * Finish refresh materialized view + */ + default Optional finishRefreshMaterializedView(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection fragments, Collection computedStatistics) + { + throw new PrestoException(GENERIC_INTERNAL_ERROR, "ConnectorMetadata finishRefreshMaterializedView() is not implemented"); + } + /** * @return whether delete without table scan is supported */ diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java index 61d546ef9561a..e031ab639566e 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java @@ -509,6 +509,22 @@ public MaterializedViewStatus getMaterializedViewStatus(ConnectorSession session } } + @Override + public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.beginRefreshMaterializedView(session, tableHandle); + } + } + + @Override + public Optional finishRefreshMaterializedView(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection fragments, Collection computedStatistics) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.finishRefreshMaterializedView(session, insertHandle, fragments, computedStatistics); + } + } + @Override public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle) {