Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1828,6 +1828,11 @@ private List<String> computeFileNamesForMissingBuckets(

@Override
public HiveInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle)
{
Comment thread
gggrace14 marked this conversation as resolved.
Outdated
return beginInsertInternal(session, tableHandle);
}

private HiveInsertTableHandle beginInsertInternal(ConnectorSession session, ConnectorTableHandle tableHandle)
{
verifyJvmTimeZone();

Expand Down Expand Up @@ -1924,6 +1929,11 @@ private HiveCompressionCodec getHiveCompressionCodec(ConnectorSession session, b

@Override
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
return finishInsertInternal(session, insertHandle, fragments, computedStatistics);
Comment thread
gggrace14 marked this conversation as resolved.
Outdated
}

private Optional<ConnectorOutputMetadata> finishInsertInternal(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
HiveInsertTableHandle handle = (HiveInsertTableHandle) insertHandle;

Expand Down Expand Up @@ -2365,6 +2375,14 @@ public void createMaterializedView(ConnectorSession session, ConnectorTableMetad
}

Table basicTable = prepareTable(session, viewMetadata, MATERIALIZED_VIEW);
viewDefinition = new ConnectorMaterializedViewDefinition(
viewDefinition.getOriginalSql(),
viewDefinition.getSchema(),
viewDefinition.getTable(),
viewDefinition.getBaseTables(),
viewDefinition.getOwner(),
viewDefinition.getColumnMappings(),
Optional.of(getPartitionedBy(viewMetadata.getProperties())));
Map<String, String> parameters = ImmutableMap.<String, String>builder()
.putAll(basicTable.getParameters())
.put(PRESTO_MATERIALIZED_VIEW_FLAG, "true")
Expand Down Expand Up @@ -2412,6 +2430,18 @@ public void dropMaterializedView(ConnectorSession session, SchemaTableName viewN
}
}

@Override
public HiveInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return beginInsertInternal(session, tableHandle);
}

@Override
public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
return finishInsertInternal(session, insertHandle, fragments, computedStatistics);
}

@Override
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,10 @@ private WriterParameters getWriterParametersForExistingUnpartitionedTable(Option

private WriterParameters getWriterParametersForExistingPartitionedTable(String partitionName, OptionalInt bucketNumber)
{
if (MetastoreUtil.isPrestoMaterializedView(table)) {
return getWriterParametersForOverwritePartition(partitionName);
}

switch (insertExistingPartitionsBehavior) {
case APPEND:
return getWriterParametersForAppendPartition(partitionName, bucketNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestIntegrationSmokeTest;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.facebook.presto.tests.QueryAssertions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -134,6 +135,7 @@
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.airlift.tpch.TpchTable.CUSTOMER;
import static io.airlift.tpch.TpchTable.LINE_ITEM;
import static io.airlift.tpch.TpchTable.NATION;
import static io.airlift.tpch.TpchTable.ORDERS;
import static io.airlift.tpch.TpchTable.PART_SUPPLIER;
import static java.lang.String.format;
Expand Down Expand Up @@ -183,7 +185,7 @@ protected TestHiveIntegrationSmokeTest(
protected QueryRunner createQueryRunner()
throws Exception
{
return HiveQueryRunner.createQueryRunner(ORDERS, CUSTOMER, LINE_ITEM, PART_SUPPLIER);
return HiveQueryRunner.createQueryRunner(ORDERS, CUSTOMER, LINE_ITEM, PART_SUPPLIER, NATION);
}

private List<?> getPartitions(HiveTableLayoutHandle tableLayoutHandle)
Expand Down Expand Up @@ -5537,6 +5539,104 @@ public void testDropMaterializedView()
computeActual("DROP TABLE IF EXISTS test_customer_base_4");
}

@Test
public void testRefreshMaterializedViewSimple()
{
Session session = getSession();
QueryRunner queryRunner = getQueryRunner();

computeActual(
"CREATE TABLE orders_partitioned WITH (partitioned_by = ARRAY['ds']) " +
"AS SELECT orderkey, orderpriority, '2020-01-01' as ds FROM orders WHERE orderkey < 1000 " +
"UNION ALL " +
"SELECT orderkey, orderpriority, '2019-01-02' as ds FROM orders WHERE orderkey > 1000");
computeActual(
"CREATE MATERIALIZED VIEW test_orders_view WITH (partitioned_by = ARRAY['ds']) " +
"AS SELECT orderkey, orderpriority, ds FROM orders_partitioned");

String refreshSql = "REFRESH MATERIALIZED VIEW test_orders_view WHERE ds='2020-01-01'";
String expectedInsertQuery = "SELECT orderkey, orderpriority, ds " +
"FROM (" +
" SELECT * FROM orders_partitioned WHERE ds='2020-01-01'" +
") orders_partitioned";
QueryAssertions.assertQuery(
queryRunner,
session,
refreshSql,
queryRunner,
"SELECT COUNT(*) FROM ( " + expectedInsertQuery + " )",
false, true);
}

@Test
public void testRefreshMaterializedView()
{
Session session = getSession();
QueryRunner queryRunner = getQueryRunner();

computeActual("CREATE TABLE test_nation_base_5 WITH (partitioned_by = ARRAY['nationkey', 'regionkey']) AS SELECT name, nationkey, regionkey FROM nation");
computeActual("CREATE TABLE test_customer_base_5 WITH (partitioned_by = ARRAY['nationkey']) AS SELECT custkey, name, mktsegment, nationkey FROM customer");
computeActual(
"CREATE MATERIALIZED VIEW test_customer_view_5 WITH (partitioned_by = ARRAY['marketsegment', 'nationkey', 'regionkey']" + retentionDays(30) + ") " +
"AS SELECT test_nation_base_5.name AS nationname, customer.custkey, customer.name AS customername, UPPER(customer.mktsegment) AS marketsegment, customer.nationkey, regionkey " +
"FROM test_nation_base_5 JOIN test_customer_base_5 customer ON (test_nation_base_5.nationkey = customer.nationkey)");

// Test predicate columns from two base tables
String refreshSql = "REFRESH MATERIALIZED VIEW test_customer_view_5 WHERE marketsegment = 'AUTOMOBILE' AND nationkey = 24 AND regionkey = 1";
String expectedInsertQuery = "SELECT *" +
"FROM (" +
" SELECT nation.name AS nationname, customer.custkey, customer.name AS customername, UPPER(customer.mktsegment) AS marketsegment, customer.nationkey, regionkey " +
" FROM (" +
" SELECT * FROM test_nation_base_5 WHERE regionkey = 1" +
" ) nation JOIN (" +
" SELECT * FROM test_customer_base_5 WHERE nationkey = 24" +
" ) customer ON (nation.nationkey = customer.nationkey)" +
") WHERE marketsegment = 'AUTOMOBILE'";
QueryAssertions.assertQuery(
Comment thread
gggrace14 marked this conversation as resolved.
Outdated
queryRunner,
session,
refreshSql,
queryRunner,
"SELECT COUNT(*) FROM ( " + expectedInsertQuery + " )",
false, true);

// Test predicate columns from one base table
refreshSql = "REFRESH MATERIALIZED VIEW test_customer_view_5 WHERE marketsegment = 'AUTOMOBILE' AND nationkey = 24";
expectedInsertQuery = "SELECT *" +
"FROM (" +
" SELECT nation.name AS nationname, customer.custkey, customer.name AS customername, UPPER(customer.mktsegment) AS marketsegment, customer.nationkey, regionkey " +
" FROM test_nation_base_5 nation JOIN (" +
" SELECT * FROM test_customer_base_5 WHERE nationkey = 24" +
" ) customer ON (nation.nationkey = customer.nationkey)" +
") WHERE marketsegment = 'AUTOMOBILE'";
QueryAssertions.assertQuery(
queryRunner,
session,
refreshSql,
queryRunner,
"SELECT COUNT(*) FROM ( " + expectedInsertQuery + " )",
false, true);

refreshSql = "REFRESH MATERIALIZED VIEW test_customer_view_5 WHERE regionkey = 1";
expectedInsertQuery = "SELECT nation.name AS nationname, customer.custkey, customer.name AS customername, UPPER(customer.mktsegment) AS marketsegment, customer.nationkey, regionkey " +
"FROM (" +
" SELECT * FROM test_nation_base_5 WHERE regionkey = 1" +
") nation JOIN test_customer_base_5 customer ON (nation.nationkey = customer.nationkey)";
QueryAssertions.assertQuery(
queryRunner,
session,
refreshSql,
queryRunner,
"SELECT COUNT(*) FROM ( " + expectedInsertQuery + " )",
false, true);

// Test invalid predicates
assertQueryFails("REFRESH MATERIALIZED VIEW test_customer_view_5 WHERE nationname = 'UNITED STATES'", ".*Refresh materialized view by column nationname is not supported.*");
assertQueryFails("REFRESH MATERIALIZED VIEW test_customer_view_5 WHERE regionkey = 1 OR nationkey = 24", ".*Only logical AND is supported in WHERE clause.*");
assertQueryFails("REFRESH MATERIALIZED VIEW test_customer_view_5 WHERE regionkey + nationkey = 25", ".*Only columns specified on literals are supported in WHERE clause.*");
assertQueryFails("REFRESH MATERIALIZED VIEW test_customer_view_5", ".*mismatched input '<EOF>'\\. Expecting: '\\.', 'WHERE'.*");
}

protected String retentionDays(int days)
{
return "";
Expand Down
Loading