diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveLocationService.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveLocationService.java index 2bddcbf2928da..0cee4b1b9fd8c 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveLocationService.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveLocationService.java @@ -37,7 +37,6 @@ import static com.facebook.presto.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_EXISTING_DIRECTORY; import static com.facebook.presto.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_NEW_DIRECTORY; import static com.facebook.presto.hive.LocationHandle.WriteMode.STAGE_AND_MOVE_TO_TARGET_DIRECTORY; -import static com.facebook.presto.hive.metastore.MetastoreUtil.createDirectory; import static com.facebook.presto.hive.metastore.MetastoreUtil.pathExists; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -55,16 +54,16 @@ public HiveLocationService(HdfsEnvironment hdfsEnvironment) } @Override - public LocationHandle forNewTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName, boolean tempPathRequired) + public LocationHandle forNewTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName, boolean tempPathRequired, Optional externalLocation) { - Path targetPath = getTableDefaultLocation(session, metastore, hdfsEnvironment, schemaName, tableName); + Path targetPath = externalLocation.orElseGet(() -> getTableDefaultLocation(session, metastore, hdfsEnvironment, schemaName, tableName)); HdfsContext context = new HdfsContext(session, schemaName, tableName, targetPath.toString(), true); // verify the target directory for the table if (pathExists(context, hdfsEnvironment, targetPath)) { throw new PrestoException(HIVE_PATH_ALREADY_EXISTS, format("Target directory for table '%s.%s' already exists: %s", schemaName, tableName, targetPath)); } - return createLocationHandle(context, session, targetPath, NEW, tempPathRequired); + return createLocationHandle(context, session, targetPath, NEW, tempPathRequired, externalLocation); } @Override @@ -73,7 +72,7 @@ public LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore, String tablePath = table.getStorage().getLocation(); HdfsContext context = new HdfsContext(session, table.getDatabaseName(), table.getTableName(), tablePath, false); Path targetPath = new Path(tablePath); - return createLocationHandle(context, session, targetPath, EXISTING, tempPathRequired); + return createLocationHandle(context, session, targetPath, EXISTING, tempPathRequired, Optional.empty()); } @Override @@ -91,12 +90,12 @@ public LocationHandle forTemporaryTable(SemiTransactionalHiveMetastore metastore DIRECT_TO_TARGET_NEW_DIRECTORY); } - private LocationHandle createLocationHandle(HdfsContext context, ConnectorSession session, Path targetPath, TableType tableType, boolean tempPathRequired) + private LocationHandle createLocationHandle(HdfsContext context, ConnectorSession session, Path targetPath, TableType tableType, boolean tempPathRequired, Optional externalLocation) { Optional tempPath = tempPathRequired ? Optional.of(createTemporaryPath(session, context, hdfsEnvironment, targetPath)) : Optional.empty(); - if (shouldUseTemporaryDirectory(session, context, targetPath)) { + + if (shouldUseTemporaryDirectory(session, context, targetPath, externalLocation)) { Path writePath = createTemporaryPath(session, context, hdfsEnvironment, targetPath); - createDirectory(context, hdfsEnvironment, writePath); return new LocationHandle(targetPath, writePath, tempPath, tableType, STAGE_AND_MOVE_TO_TARGET_DIRECTORY); } if (tableType.equals(EXISTING)) { @@ -105,11 +104,13 @@ private LocationHandle createLocationHandle(HdfsContext context, ConnectorSessio return new LocationHandle(targetPath, targetPath, tempPath, tableType, DIRECT_TO_TARGET_NEW_DIRECTORY); } - private boolean shouldUseTemporaryDirectory(ConnectorSession session, HdfsContext context, Path path) + private boolean shouldUseTemporaryDirectory(ConnectorSession session, HdfsContext context, Path path, Optional externalLocation) { return isTemporaryStagingDirectoryEnabled(session) // skip using temporary directory for S3 - && !isS3FileSystem(context, hdfsEnvironment, path); + && !isS3FileSystem(context, hdfsEnvironment, path) + // Skip using temporary directory if destination is external. Target may be on a different file system. + && !externalLocation.isPresent(); } @Override diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java index 47c53c74e5949..823d26f5d9153 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java @@ -276,6 +276,7 @@ import static com.facebook.presto.hive.HiveUtil.translateHiveUnsupportedTypesForTemporaryTable; import static com.facebook.presto.hive.HiveUtil.verifyPartitionTypeSupported; import static com.facebook.presto.hive.HiveWriteUtils.checkTableIsWritable; +import static com.facebook.presto.hive.HiveWriteUtils.isS3FileSystem; import static com.facebook.presto.hive.HiveWriteUtils.isWritableType; import static com.facebook.presto.hive.HiveWriterFactory.computeBucketedFileName; import static com.facebook.presto.hive.HiveWriterFactory.getFileExtension; @@ -368,19 +369,23 @@ public class HiveMetadata private static final String PRESTO_TEMPORARY_TABLE_NAME_PREFIX = "__presto_temporary_table_"; // Comma is not a reserved keyword with or without quote - // See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Keywords,Non-reservedKeywordsandReservedKeywords + // See + // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Keywords,Non-reservedKeywordsandReservedKeywords private static final char COMMA = ','; // 1009 is chosen as a prime number greater than 1000. - // This is because Hive bucket function can result in skewed distribution when bucket number of power of 2 - // TODO: Use a regular number once better hash function is used for table write shuffle partitioning. + // This is because Hive bucket function can result in skewed distribution when + // bucket number of power of 2 + // TODO: Use a regular number once better hash function is used for table write + // shuffle partitioning. private static final int SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE = 1009; private static final String CSV_SEPARATOR_KEY = OpenCSVSerde.SEPARATORCHAR; private static final String CSV_QUOTE_KEY = OpenCSVSerde.QUOTECHAR; private static final String CSV_ESCAPE_KEY = OpenCSVSerde.ESCAPECHAR; - private static final JsonCodec MATERIALIZED_VIEW_JSON_CODEC = jsonCodec(ConnectorMaterializedViewDefinition.class); + private static final JsonCodec MATERIALIZED_VIEW_JSON_CODEC = jsonCodec( + ConnectorMaterializedViewDefinition.class); private final boolean allowCorruptWritesForTesting; private final SemiTransactionalHiveMetastore metastore; @@ -445,7 +450,8 @@ public HiveMetadata( this.locationService = requireNonNull(locationService, "locationService is null"); this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null"); - this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null"); + this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, + "filterStatsCalculatorService is null"); this.tableParameterCodec = requireNonNull(tableParameterCodec, "tableParameterCodec is null"); this.partitionUpdateCodec = requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is null"); this.partitionUpdateSmileCodec = requireNonNull(partitionUpdateSmileCodec, "partitionUpdateSmileCodec is null"); @@ -458,7 +464,8 @@ public HiveMetadata( this.stagingFileCommitter = requireNonNull(stagingFileCommitter, "stagingFileCommitter is null"); this.zeroRowFileCreator = requireNonNull(zeroRowFileCreator, "zeroRowFileCreator is null"); this.partitionObjectBuilder = requireNonNull(partitionObjectBuilder, "partitionObjectBuilder is null"); - this.encryptionInformationProvider = requireNonNull(encryptionInformationProvider, "encryptionInformationProvider is null"); + this.encryptionInformationProvider = requireNonNull(encryptionInformationProvider, + "encryptionInformationProvider is null"); this.hivePartitionStats = requireNonNull(hivePartitionStats, "hivePartitionStats is null"); this.hiveFileRenamer = requireNonNull(hiveFileRenamer, "hiveFileRenamer is null"); } @@ -479,13 +486,15 @@ public HiveTableHandle getTableHandle(ConnectorSession session, SchemaTableName { requireNonNull(tableName, "tableName is null"); MetastoreContext metastoreContext = getMetastoreContext(session); - Optional table = metastore.getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName()); + Optional
table = metastore.getTable(metastoreContext, tableName.getSchemaName(), + tableName.getTableName()); if (!table.isPresent()) { return null; } if (getSourceTableNameFromSystemTable(tableName).isPresent()) { - // We must not allow system table due to how permissions are checked in SystemTableAwareAccessControl.checkCanSelectFromTable() + // We must not allow system table due to how permissions are checked in + // SystemTableAwareAccessControl.checkCanSelectFromTable() throw new PrestoException(NOT_SUPPORTED, "Unexpected table present in Hive metastore: " + tableName); } @@ -497,7 +506,8 @@ public HiveTableHandle getTableHandle(ConnectorSession session, SchemaTableName } @Override - public ConnectorTableHandle getTableHandleForStatisticsCollection(ConnectorSession session, SchemaTableName tableName, Map analyzeProperties) + public ConnectorTableHandle getTableHandleForStatisticsCollection(ConnectorSession session, + SchemaTableName tableName, Map analyzeProperties) { HiveTableHandle handle = getTableHandle(session, tableName); if (handle == null) { @@ -510,7 +520,8 @@ public ConnectorTableHandle getTableHandleForStatisticsCollection(ConnectorSessi List partitionedBy = getPartitionedBy(tableMetadata.getProperties()); if (partitionValuesList.isPresent() && partitionedBy.isEmpty()) { - throw new PrestoException(INVALID_ANALYZE_PROPERTY, "Only partitioned table can be analyzed with a partition list"); + throw new PrestoException(INVALID_ANALYZE_PROPERTY, + "Only partitioned table can be analyzed with a partition list"); } return handle; } @@ -519,18 +530,22 @@ public ConnectorTableHandle getTableHandleForStatisticsCollection(ConnectorSessi public Optional getSystemTable(ConnectorSession session, SchemaTableName tableName) { if (SystemTableHandler.PARTITIONS.matches(tableName)) { - return getPartitionsSystemTable(session, tableName, SystemTableHandler.PARTITIONS.getSourceTableName(tableName)); + return getPartitionsSystemTable(session, tableName, + SystemTableHandler.PARTITIONS.getSourceTableName(tableName)); } if (SystemTableHandler.PROPERTIES.matches(tableName)) { - return getPropertiesSystemTable(session, tableName, SystemTableHandler.PROPERTIES.getSourceTableName(tableName)); + return getPropertiesSystemTable(session, tableName, + SystemTableHandler.PROPERTIES.getSourceTableName(tableName)); } return Optional.empty(); } - private Optional getPropertiesSystemTable(ConnectorSession session, SchemaTableName tableName, SchemaTableName sourceTableName) + private Optional getPropertiesSystemTable(ConnectorSession session, SchemaTableName tableName, + SchemaTableName sourceTableName) { MetastoreContext metastoreContext = getMetastoreContext(session); - Optional
table = metastore.getTable(metastoreContext, sourceTableName.getSchemaName(), sourceTableName.getTableName()); + Optional
table = metastore.getTable(metastoreContext, sourceTableName.getSchemaName(), + sourceTableName.getTableName()); if (!table.isPresent() || table.get().getTableType().equals(VIRTUAL_VIEW)) { throw new TableNotFoundException(tableName); } @@ -543,10 +558,12 @@ private Optional getPropertiesSystemTable(ConnectorSession session, .collect(toImmutableList()); Iterable> propertyValues = ImmutableList.of(ImmutableList.copyOf(sortedTableParameters.values())); - return Optional.of(createSystemTable(new ConnectorTableMetadata(sourceTableName, columns), constraint -> new InMemoryRecordSet(types, propertyValues).cursor())); + return Optional.of(createSystemTable(new ConnectorTableMetadata(sourceTableName, columns), + constraint -> new InMemoryRecordSet(types, propertyValues).cursor())); } - private Optional getPartitionsSystemTable(ConnectorSession session, SchemaTableName tableName, SchemaTableName sourceTableName) + private Optional getPartitionsSystemTable(ConnectorSession session, SchemaTableName tableName, + SchemaTableName sourceTableName) { HiveTableHandle sourceTableHandle = getTableHandle(session, sourceTableName); @@ -555,7 +572,8 @@ private Optional getPartitionsSystemTable(ConnectorSession session, } MetastoreContext metastoreContext = getMetastoreContext(session); - Table sourceTable = metastore.getTable(metastoreContext, sourceTableName.getSchemaName(), sourceTableName.getTableName()) + Table sourceTable = metastore + .getTable(metastoreContext, sourceTableName.getSchemaName(), sourceTableName.getTableName()) .orElseThrow(() -> new TableNotFoundException(sourceTableName)); List partitionColumns = getPartitionKeyColumnHandles(sourceTable); if (partitionColumns.isEmpty()) { @@ -575,10 +593,9 @@ private Optional getPartitionsSystemTable(ConnectorSession session, column.isHidden())) .collect(toImmutableList()); - Map fieldIdToColumnHandle = - IntStream.range(0, partitionColumns.size()) - .boxed() - .collect(toImmutableMap(identity(), partitionColumns::get)); + Map fieldIdToColumnHandle = IntStream.range(0, partitionColumns.size()) + .boxed() + .collect(toImmutableMap(identity(), partitionColumns::get)); return Optional.of(createSystemTable( new ConnectorTableMetadata(tableName, partitionSystemTableColumns), @@ -586,14 +603,14 @@ private Optional getPartitionsSystemTable(ConnectorSession session, TupleDomain targetTupleDomain = constraint.transform(fieldIdToColumnHandle::get); Predicate> targetPredicate = convertToPredicate(targetTupleDomain); Constraint targetConstraint = new Constraint(targetTupleDomain, targetPredicate); - Iterable> records = () -> - stream(partitionManager.getPartitionsIterator(metastore, sourceTableHandle, targetConstraint, session)) - .map(hivePartition -> - IntStream.range(0, partitionColumns.size()) - .mapToObj(fieldIdToColumnHandle::get) - .map(columnHandle -> ((HivePartition) hivePartition).getKeys().get(columnHandle).getValue()) - .collect(toList())) - .iterator(); + Iterable> records = () -> stream(partitionManager.getPartitionsIterator(metastore, + sourceTableHandle, targetConstraint, session)) + .map(hivePartition -> IntStream.range(0, partitionColumns.size()) + .mapToObj(fieldIdToColumnHandle::get) + .map(columnHandle -> ((HivePartition) hivePartition).getKeys() + .get(columnHandle).getValue()) + .collect(toList())) + .iterator(); return new InMemoryRecordSet(partitionColumnTypes, records).cursor(); })); @@ -608,12 +625,14 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName tableName) { MetastoreContext metastoreContext = getMetastoreContext(session); - Optional
table = metastore.getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName()); + Optional
table = metastore.getTable(metastoreContext, tableName.getSchemaName(), + tableName.getTableName()); if (!table.isPresent() || table.get().getTableType().equals(VIRTUAL_VIEW)) { throw new TableNotFoundException(tableName); } - Function metadataGetter = columnMetadataGetter(table.get(), typeManager, metastoreContext.getColumnConverter()); + Function metadataGetter = columnMetadataGetter(table.get(), typeManager, + metastoreContext.getColumnConverter()); ImmutableList.Builder columns = ImmutableList.builder(); for (HiveColumnHandle columnHandle : hiveColumnHandles(table.get())) { columns.add(metadataGetter.apply(columnHandle)); @@ -635,7 +654,8 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema // todo fail if format is not known } - getTableEncryptionPropertiesFromHiveProperties(table.get().getParameters(), format).map(TableEncryptionProperties::toTableProperties).ifPresent(properties::putAll); + getTableEncryptionPropertiesFromHiveProperties(table.get().getParameters(), format) + .map(TableEncryptionProperties::toTableProperties).ifPresent(properties::putAll); // Partitioning property List partitionedBy = table.get().getPartitionColumns().stream() @@ -654,10 +674,12 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema }); // Preferred ordering columns - List preferredOrderingColumns = decodePreferredOrderingColumnsFromStorage(table.get().getStorage()); + List preferredOrderingColumns = decodePreferredOrderingColumnsFromStorage( + table.get().getStorage()); if (!preferredOrderingColumns.isEmpty()) { if (bucketProperty.isPresent()) { - throw new PrestoException(HIVE_INVALID_METADATA, format("bucketed table %s should not specify preferred_ordering_columns", tableName)); + throw new PrestoException(HIVE_INVALID_METADATA, + format("bucketed table %s should not specify preferred_ordering_columns", tableName)); } properties.put(PREFERRED_ORDERING_COLUMNS, preferredOrderingColumns); } @@ -665,7 +687,8 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema // ORC format specific properties String orcBloomFilterColumns = table.get().getParameters().get(ORC_BLOOM_FILTER_COLUMNS_KEY); if (orcBloomFilterColumns != null) { - properties.put(ORC_BLOOM_FILTER_COLUMNS, Splitter.on(COMMA).trimResults().omitEmptyStrings().splitToList(orcBloomFilterColumns)); + properties.put(ORC_BLOOM_FILTER_COLUMNS, + Splitter.on(COMMA).trimResults().omitEmptyStrings().splitToList(orcBloomFilterColumns)); } String orcBloomFilterFfp = table.get().getParameters().get(ORC_BLOOM_FILTER_FPP_KEY); if (orcBloomFilterFfp != null) { @@ -698,7 +721,8 @@ private static Optional getCsvSerdeProperty(Table table, String key) { return getSerdeProperty(table, key).map(csvSerdeProperty -> { if (csvSerdeProperty.length() > 1) { - throw new PrestoException(HIVE_INVALID_METADATA, "Only single character can be set for property: " + key); + throw new PrestoException(HIVE_INVALID_METADATA, + "Only single character can be set for property: " + key); } return csvSerdeProperty; }); @@ -708,16 +732,20 @@ private static Optional getSerdeProperty(Table table, String key) { String serdePropertyValue = table.getStorage().getSerdeParameters().get(key); String tablePropertyValue = table.getParameters().get(key); - if (serdePropertyValue != null && tablePropertyValue != null && !tablePropertyValue.equals(serdePropertyValue)) { - // in Hive one can set conflicting values for the same property, in such case it looks like table properties are used + if (serdePropertyValue != null && tablePropertyValue != null + && !tablePropertyValue.equals(serdePropertyValue)) { + // in Hive one can set conflicting values for the same property, in such case it + // looks like table properties are used throw new PrestoException( HIVE_INVALID_METADATA, - format("Different values for '%s' set in serde properties and table properties: '%s' and '%s'", key, serdePropertyValue, tablePropertyValue)); + format("Different values for '%s' set in serde properties and table properties: '%s' and '%s'", key, + serdePropertyValue, tablePropertyValue)); } return firstNonNullable(tablePropertyValue, serdePropertyValue); } - protected Optional getTableEncryptionPropertiesFromHiveProperties(Map parameters, HiveStorageFormat storageFormat) + protected Optional getTableEncryptionPropertiesFromHiveProperties( + Map parameters, HiveStorageFormat storageFormat) { if (storageFormat != DWRF) { return Optional.empty(); @@ -775,7 +803,8 @@ public Map getColumnHandles(ConnectorSession session, Conn @SuppressWarnings("TryWithIdenticalCatches") @Override - public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) + public Map> listTableColumns(ConnectorSession session, + SchemaTablePrefix prefix) { requireNonNull(prefix, "prefix is null"); ImmutableMap.Builder> columns = ImmutableMap.builder(); @@ -794,7 +823,9 @@ public Map> listTableColumns(ConnectorSess } @Override - public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle, Optional tableLayoutHandle, List columnHandles, Constraint constraint) + public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle, + Optional tableLayoutHandle, List columnHandles, + Constraint constraint) { if (!isStatisticsEnabled(session)) { return TableStatistics.empty(); @@ -802,16 +833,20 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab // TODO Adjust statistics to take into account required subfields - if (!tableLayoutHandle.isPresent() || !((HiveTableLayoutHandle) tableLayoutHandle.get()).isPushdownFilterEnabled()) { + if (!tableLayoutHandle.isPresent() + || !((HiveTableLayoutHandle) tableLayoutHandle.get()).isPushdownFilterEnabled()) { Map columns = columnHandles.stream() .map(HiveColumnHandle.class::cast) .filter(not(HiveColumnHandle::isHidden)) .collect(toImmutableMap(HiveColumnHandle::getName, Function.identity())); Map columnTypes = columns.entrySet().stream() - .collect(toImmutableMap(Map.Entry::getKey, entry -> getColumnMetadata(session, tableHandle, entry.getValue()).getType())); - List partitions = partitionManager.getPartitions(metastore, tableHandle, constraint, session).getPartitions(); - return hiveStatisticsProvider.getTableStatistics(session, ((HiveTableHandle) tableHandle).getSchemaTableName(), columns, columnTypes, partitions); + .collect(toImmutableMap(Map.Entry::getKey, + entry -> getColumnMetadata(session, tableHandle, entry.getValue()).getType())); + List partitions = partitionManager.getPartitions(metastore, tableHandle, constraint, session) + .getPartitions(); + return hiveStatisticsProvider.getTableStatistics(session, + ((HiveTableHandle) tableHandle).getSchemaTableName(), columns, columnTypes, partitions); } verify(!constraint.predicate().isPresent()); @@ -830,26 +865,35 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab .collect(toImmutableList())) .build(); - Map allColumns = Maps.uniqueIndex(allColumnHandles, column -> ((HiveColumnHandle) column).getName()); + Map allColumns = Maps.uniqueIndex(allColumnHandles, + column -> ((HiveColumnHandle) column).getName()); Map allColumnTypes = allColumns.entrySet().stream() - .collect(toImmutableMap(Map.Entry::getKey, entry -> getColumnMetadata(session, tableHandle, entry.getValue()).getType())); + .collect(toImmutableMap(Map.Entry::getKey, + entry -> getColumnMetadata(session, tableHandle, entry.getValue()).getType())); - Constraint combinedConstraint = new Constraint<>(constraint.getSummary().intersect(hiveLayoutHandle.getDomainPredicate() - .transform(subfield -> isEntireColumn(subfield) ? subfield.getRootName() : null) - .transform(allColumns::get))); + Constraint combinedConstraint = new Constraint<>(constraint.getSummary() + .intersect(hiveLayoutHandle.getDomainPredicate() + .transform(subfield -> isEntireColumn(subfield) ? subfield.getRootName() : null) + .transform(allColumns::get))); - SubfieldExtractor subfieldExtractor = new SubfieldExtractor(functionResolution, rowExpressionService.getExpressionOptimizer(), session); + SubfieldExtractor subfieldExtractor = new SubfieldExtractor(functionResolution, + rowExpressionService.getExpressionOptimizer(), session); RowExpression domainPredicate = rowExpressionService.getDomainTranslator().toPredicate( hiveLayoutHandle.getDomainPredicate() - .transform(subfield -> subfieldExtractor.toRowExpression(subfield, allColumnTypes.get(subfield.getRootName())))); - RowExpression combinedPredicate = binaryExpression(SpecialFormExpression.Form.AND, ImmutableList.of(hiveLayoutHandle.getRemainingPredicate(), domainPredicate)); + .transform(subfield -> subfieldExtractor.toRowExpression(subfield, + allColumnTypes.get(subfield.getRootName())))); + RowExpression combinedPredicate = binaryExpression(SpecialFormExpression.Form.AND, + ImmutableList.of(hiveLayoutHandle.getRemainingPredicate(), domainPredicate)); - List partitions = partitionManager.getPartitions(metastore, tableHandle, combinedConstraint, session).getPartitions(); - TableStatistics tableStatistics = hiveStatisticsProvider.getTableStatistics(session, ((HiveTableHandle) tableHandle).getSchemaTableName(), allColumns, allColumnTypes, partitions); + List partitions = partitionManager + .getPartitions(metastore, tableHandle, combinedConstraint, session).getPartitions(); + TableStatistics tableStatistics = hiveStatisticsProvider.getTableStatistics(session, + ((HiveTableHandle) tableHandle).getSchemaTableName(), allColumns, allColumnTypes, partitions); - return filterStatsCalculatorService.filterStats(tableStatistics, combinedPredicate, session, ImmutableBiMap.copyOf(allColumns).inverse(), allColumnTypes); + return filterStatsCalculatorService.filterStats(tableStatistics, combinedPredicate, session, + ImmutableBiMap.copyOf(allColumns).inverse(), allColumnTypes); } private List listTables(ConnectorSession session, SchemaTablePrefix prefix) @@ -864,7 +908,8 @@ private List listTables(ConnectorSession session, SchemaTablePr * NOTE: This method does not return column comment */ @Override - public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) + public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, + ColumnHandle columnHandle) { return ((HiveColumnHandle) columnHandle).getColumnMetadata(typeManager); } @@ -875,9 +920,11 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable * Only Hive partition columns that are used in IO planning. */ @Override - public TupleDomain toExplainIOConstraints(ConnectorSession session, ConnectorTableHandle tableHandle, TupleDomain constraints) + public TupleDomain toExplainIOConstraints(ConnectorSession session, ConnectorTableHandle tableHandle, + TupleDomain constraints) { - return constraints.transform(columnHandle -> ((HiveColumnHandle) columnHandle).isPartitionKey() ? columnHandle : null); + return constraints + .transform(columnHandle -> ((HiveColumnHandle) columnHandle).isPartitionKey() ? columnHandle : null); } @Override @@ -926,7 +973,8 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe PrestoTableType tableType = isExternalTable(tableMetadata.getProperties()) ? EXTERNAL_TABLE : MANAGED_TABLE; Table table = prepareTable(session, tableMetadata, tableType); PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(table.getOwner()); - HiveBasicStatistics basicStatistics = table.getPartitionColumns().isEmpty() ? createZeroStatistics() : createEmptyStatistics(); + HiveBasicStatistics basicStatistics = table.getPartitionColumns().isEmpty() ? createZeroStatistics() + : createEmptyStatistics(); metastore.createTable( session, @@ -937,7 +985,8 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe new PartitionStatistics(basicStatistics, ImmutableMap.of())); } - private Table prepareTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, PrestoTableType tableType) + private Table prepareTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, + PrestoTableType tableType) { SchemaTableName schemaTableName = tableMetadata.getTable(); String schemaName = schemaTableName.getSchemaName(); @@ -945,19 +994,24 @@ private Table prepareTable(ConnectorSession session, ConnectorTableMetadata tabl List partitionedBy = getPartitionedBy(tableMetadata.getProperties()); Optional bucketProperty = getBucketProperty(tableMetadata.getProperties()); - if ((bucketProperty.isPresent() || !partitionedBy.isEmpty()) && getAvroSchemaUrl(tableMetadata.getProperties()) != null) { - throw new PrestoException(NOT_SUPPORTED, "Bucketing/Partitioning columns not supported when Avro schema url is set"); + if ((bucketProperty.isPresent() || !partitionedBy.isEmpty()) + && getAvroSchemaUrl(tableMetadata.getProperties()) != null) { + throw new PrestoException(NOT_SUPPORTED, + "Bucketing/Partitioning columns not supported when Avro schema url is set"); } - List columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), typeTranslator); + List columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), + typeTranslator); HiveStorageFormat hiveStorageFormat = getHiveStorageFormat(tableMetadata.getProperties()); List preferredOrderingColumns = getPreferredOrderingColumns(tableMetadata.getProperties()); - Optional tableEncryptionProperties = getTableEncryptionPropertiesFromTableProperties(tableMetadata, hiveStorageFormat, partitionedBy); + Optional tableEncryptionProperties = getTableEncryptionPropertiesFromTableProperties( + tableMetadata, hiveStorageFormat, partitionedBy); if (tableEncryptionProperties.isPresent() && partitionedBy.isEmpty()) { - throw new PrestoException(HIVE_UNSUPPORTED_ENCRYPTION_OPERATION, "Creating an encrypted table without partitions is not supported. Use CREATE TABLE AS SELECT to " + - "create an encrypted table without partitions"); + throw new PrestoException(HIVE_UNSUPPORTED_ENCRYPTION_OPERATION, + "Creating an encrypted table without partitions is not supported. Use CREATE TABLE AS SELECT to " + + "create an encrypted table without partitions"); } validateColumns(hiveStorageFormat, columnHandles); @@ -977,10 +1031,12 @@ private Table prepareTable(ConnectorSession session, ConnectorTableMetadata tabl throw new PrestoException(NOT_SUPPORTED, "Cannot create non-managed Hive table"); } String externalLocation = getExternalLocation(tableMetadata.getProperties()); - targetPath = getExternalPath(new HdfsContext(session, schemaName, tableName, externalLocation, true), externalLocation); + targetPath = getExternalLocationAsPath(externalLocation); + checkExternalPath(new HdfsContext(session, schemaName, tableName), targetPath); } else if (tableType.equals(MANAGED_TABLE) || tableType.equals(MATERIALIZED_VIEW)) { - LocationHandle locationHandle = locationService.forNewTable(metastore, session, schemaName, tableName, isTempPathRequired(session, bucketProperty, preferredOrderingColumns)); + LocationHandle locationHandle = locationService.forNewTable(metastore, session, schemaName, tableName, + isTempPathRequired(session, bucketProperty, preferredOrderingColumns), Optional.empty()); targetPath = locationService.getQueryWriteInfo(locationHandle).getTargetPath(); } else { @@ -1011,7 +1067,8 @@ else if (tableType.equals(MANAGED_TABLE) || tableType.equals(MATERIALIZED_VIEW)) } @Override - public ConnectorTableHandle createTemporaryTable(ConnectorSession session, List columns, Optional partitioningMetadata) + public ConnectorTableHandle createTemporaryTable(ConnectorSession session, List columns, + Optional partitioningMetadata) { String schemaName = getTemporaryTableSchema(session); HiveStorageFormat storageFormat = getTemporaryTableStorageFormat(session); @@ -1061,7 +1118,8 @@ public ConnectorTableHandle createTemporaryTable(ConnectorSession session, List< } // PAGEFILE format doesn't require translation to hive type, - // choose HIVE_BINARY as a default hive type to make it compatible with Hive connector + // choose HIVE_BINARY as a default hive type to make it compatible with Hive + // connector Optional defaultHiveType = storageFormat == PAGEFILE ? Optional.of(HIVE_BINARY) : Optional.empty(); List columnHandles = getColumnHandles( @@ -1085,8 +1143,10 @@ public ConnectorTableHandle createTemporaryTable(ConnectorSession session, List< .setOwner(session.getUser()) .setTableType(TEMPORARY_TABLE) .setDataColumns(columnHandles.stream() - // Not propagating column type metadata because temp tables are not visible to users - .map(handle -> new Column(handle.getName(), handle.getHiveType(), handle.getComment(), Optional.empty())) + // Not propagating column type metadata because temp tables are not visible to + // users + .map(handle -> new Column(handle.getName(), handle.getHiveType(), handle.getComment(), + Optional.empty())) .collect(toImmutableList())) .withStorage(storage -> storage .setStorageFormat(fromHiveStorageFormat(finalStorageFormat)) @@ -1100,11 +1160,14 @@ public ConnectorTableHandle createTemporaryTable(ConnectorSession session, List< List hiveColumnHandles = hiveColumnHandles(table); Map columnTypes = hiveColumnHandles.stream() .filter(columnHandle -> !columnHandle.isHidden()) - .collect(toImmutableMap(HiveColumnHandle::getName, column -> column.getHiveType().getType(typeManager))); + .collect( + toImmutableMap(HiveColumnHandle::getName, column -> column.getHiveType().getType(typeManager))); Map> columnStatisticTypes = hiveColumnHandles.stream() .filter(columnHandle -> !partitionColumnNames.contains(columnHandle.getName())) .filter(column -> !column.isHidden()) - .collect(toImmutableMap(HiveColumnHandle::getName, column -> ImmutableSet.copyOf(getSupportedColumnStatisticsForTemporaryTable(typeManager.getType(column.getTypeSignature()))))); + .collect(toImmutableMap(HiveColumnHandle::getName, + column -> ImmutableSet.copyOf(getSupportedColumnStatisticsForTemporaryTable( + typeManager.getType(column.getTypeSignature()))))); metastore.createTable( session, @@ -1119,7 +1182,8 @@ public ConnectorTableHandle createTemporaryTable(ConnectorSession session, List< private Set getSupportedColumnStatisticsForTemporaryTable(Type type) { - // Temporary table statistics are not committed to metastore, so no need to call metastore for supported + // Temporary table statistics are not committed to metastore, so no need to call + // metastore for supported // column statistics, instead locally determine. if (type.equals(BOOLEAN)) { return ImmutableSet.of(NUMBER_OF_NON_NULL_VALUES, NUMBER_OF_TRUE_VALUES); @@ -1128,7 +1192,8 @@ private Set getSupportedColumnStatisticsForTemporaryTable(T return ImmutableSet.of(MIN_VALUE, MAX_VALUE, NUMBER_OF_DISTINCT_VALUES, NUMBER_OF_NON_NULL_VALUES); } if (isVarcharType(type) || isCharType(type)) { - return ImmutableSet.of(NUMBER_OF_NON_NULL_VALUES, NUMBER_OF_DISTINCT_VALUES, TOTAL_SIZE_IN_BYTES, MAX_VALUE_SIZE_IN_BYTES); + return ImmutableSet.of(NUMBER_OF_NON_NULL_VALUES, NUMBER_OF_DISTINCT_VALUES, TOTAL_SIZE_IN_BYTES, + MAX_VALUE_SIZE_IN_BYTES); } if (type.equals(VARBINARY)) { return ImmutableSet.of(NUMBER_OF_NON_NULL_VALUES, TOTAL_SIZE_IN_BYTES, MAX_VALUE_SIZE_IN_BYTES); @@ -1156,17 +1221,21 @@ private static void validateAvroType(TypeInfo type, String columnName) if (type.getCategory() == ObjectInspector.Category.MAP) { TypeInfo keyType = mapTypeInfo(type).getMapKeyTypeInfo(); if ((keyType.getCategory() != ObjectInspector.Category.PRIMITIVE) || - (primitiveTypeInfo(keyType).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING)) { - throw new PrestoException(NOT_SUPPORTED, format("Column %s has a non-varchar map key, which is not supported by Avro", columnName)); + (primitiveTypeInfo(keyType) + .getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING)) { + throw new PrestoException(NOT_SUPPORTED, + format("Column %s has a non-varchar map key, which is not supported by Avro", columnName)); } } else if (type.getCategory() == ObjectInspector.Category.PRIMITIVE) { PrimitiveObjectInspector.PrimitiveCategory primitive = primitiveTypeInfo(type).getPrimitiveCategory(); if (primitive == PrimitiveObjectInspector.PrimitiveCategory.BYTE) { - throw new PrestoException(NOT_SUPPORTED, format("Column %s is tinyint, which is not supported by Avro. Use integer instead.", columnName)); + throw new PrestoException(NOT_SUPPORTED, format( + "Column %s is tinyint, which is not supported by Avro. Use integer instead.", columnName)); } if (primitive == PrimitiveObjectInspector.PrimitiveCategory.SHORT) { - throw new PrestoException(NOT_SUPPORTED, format("Column %s is smallint, which is not supported by Avro. Use integer instead.", columnName)); + throw new PrestoException(NOT_SUPPORTED, format( + "Column %s is smallint, which is not supported by Avro. Use integer instead.", columnName)); } } } @@ -1196,14 +1265,16 @@ private Map getEmptyTableProperties( List columns = getOrcBloomFilterColumns(tableMetadata.getProperties()); if (columns != null && !columns.isEmpty()) { tableProperties.put(ORC_BLOOM_FILTER_COLUMNS_KEY, Joiner.on(COMMA).join(columns)); - tableProperties.put(ORC_BLOOM_FILTER_FPP_KEY, String.valueOf(getOrcBloomFilterFpp(tableMetadata.getProperties()))); + tableProperties.put(ORC_BLOOM_FILTER_FPP_KEY, + String.valueOf(getOrcBloomFilterFpp(tableMetadata.getProperties()))); } // Avro specific properties String avroSchemaUrl = getAvroSchemaUrl(tableMetadata.getProperties()); if (avroSchemaUrl != null) { if (hiveStorageFormat != AVRO) { - throw new PrestoException(INVALID_TABLE_PROPERTY, format("Cannot specify %s table property for storage format: %s", AVRO_SCHEMA_URL, hiveStorageFormat)); + throw new PrestoException(INVALID_TABLE_PROPERTY, format( + "Cannot specify %s table property for storage format: %s", AVRO_SCHEMA_URL, hiveStorageFormat)); } tableProperties.put(AVRO_SCHEMA_URL_KEY, validateAndNormalizeAvroSchemaUrl(avroSchemaUrl, hdfsContext)); } @@ -1229,15 +1300,18 @@ private Map getEmptyTableProperties( tableMetadata.getComment().ifPresent(value -> tableProperties.put(TABLE_COMMENT, value)); // Encryption specific settings - tableProperties.putAll(tableEncryptionProperties.map(TableEncryptionProperties::toHiveProperties).orElseGet(ImmutableMap::of)); + tableProperties.putAll( + tableEncryptionProperties.map(TableEncryptionProperties::toHiveProperties).orElseGet(ImmutableMap::of)); return tableProperties.build(); } - private static void checkFormatForProperty(HiveStorageFormat actualStorageFormat, HiveStorageFormat expectedStorageFormat, String propertyName) + private static void checkFormatForProperty(HiveStorageFormat actualStorageFormat, + HiveStorageFormat expectedStorageFormat, String propertyName) { if (actualStorageFormat != expectedStorageFormat) { - throw new PrestoException(INVALID_TABLE_PROPERTY, format("Cannot specify %s table property for storage format: %s", propertyName, actualStorageFormat)); + throw new PrestoException(INVALID_TABLE_PROPERTY, format( + "Cannot specify %s table property for storage format: %s", propertyName, actualStorageFormat)); } } @@ -1261,7 +1335,8 @@ private String validateAndNormalizeAvroSchemaUrl(String url, HdfsContext context return url; } catch (IOException ex) { - throw new PrestoException(INVALID_TABLE_PROPERTY, "Avro schema file is not a valid file system URI: " + url, ex); + throw new PrestoException(INVALID_TABLE_PROPERTY, + "Avro schema file is not a valid file system URI: " + url, ex); } } catch (IOException e) { @@ -1269,17 +1344,29 @@ private String validateAndNormalizeAvroSchemaUrl(String url, HdfsContext context } } - private Path getExternalPath(HdfsContext context, String location) + private static Path getExternalLocationAsPath(String location) + { + try { + return new Path(location); + } + catch (IllegalArgumentException e) { + throw new PrestoException(INVALID_TABLE_PROPERTY, + "External location is not a valid file system URI: " + location, e); + } + } + + private void checkExternalPath(HdfsContext context, Path path) { try { - Path path = new Path(location); - if (!hdfsEnvironment.getFileSystem(context, path).isDirectory(path)) { - throw new PrestoException(INVALID_TABLE_PROPERTY, "External location must be a directory"); + if (!isS3FileSystem(context, hdfsEnvironment, path)) { + if (!hdfsEnvironment.getFileSystem(context, path).isDirectory(path)) { + throw new PrestoException(INVALID_TABLE_PROPERTY, "External location must be a directory: " + path); + } } - return path; } - catch (IllegalArgumentException | IOException e) { - throw new PrestoException(INVALID_TABLE_PROPERTY, "External location is not a valid file system URI", e); + catch (IOException e) { + throw new PrestoException(INVALID_TABLE_PROPERTY, + "External location is not a valid file system URI: " + path, e); } } @@ -1320,7 +1407,8 @@ private static Table buildTableObject( String name = columnHandle.getName(); HiveType type = columnHandle.getHiveType(); if (!partitionColumnNames.contains(name)) { - verify(!columnHandle.isPartitionKey(), "Column handles are not consistent with partitioned by property"); + verify(!columnHandle.isPartitionKey(), + "Column handles are not consistent with partitioned by property"); columns.add(columnHandleToColumn(metastoreContext, columnHandle)); } else { @@ -1349,7 +1437,8 @@ private static Table buildTableObject( tableBuilder.getStorageBuilder() .setStorageFormat(fromHiveStorageFormat(hiveStorageFormat)) .setBucketProperty(bucketProperty) - .setParameters(ImmutableMap.of(PREFERRED_ORDERING_COLUMNS, encodePreferredOrderingColumns(preferredOrderingColumns))) + .setParameters(ImmutableMap.of(PREFERRED_ORDERING_COLUMNS, + encodePreferredOrderingColumns(preferredOrderingColumns))) .setLocation(targetPath.toString()); return tableBuilder.build(); @@ -1375,18 +1464,21 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle failIfAvroSchemaIsSet(session, handle); MetastoreContext metastoreContext = getMetastoreContext(session); - metastore.addColumn(metastoreContext, handle.getSchemaName(), handle.getTableName(), column.getName(), toHiveType(typeTranslator, column.getType()), column.getComment()); + metastore.addColumn(metastoreContext, handle.getSchemaName(), handle.getTableName(), column.getName(), + toHiveType(typeTranslator, column.getType()), column.getComment()); } @Override - public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle source, String target) + public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle source, + String target) { HiveTableHandle hiveTableHandle = (HiveTableHandle) tableHandle; failIfAvroSchemaIsSet(session, hiveTableHandle); HiveColumnHandle sourceHandle = (HiveColumnHandle) source; MetastoreContext metastoreContext = getMetastoreContext(session); - metastore.renameColumn(metastoreContext, hiveTableHandle.getSchemaName(), hiveTableHandle.getTableName(), sourceHandle.getName(), target); + metastore.renameColumn(metastoreContext, hiveTableHandle.getSchemaName(), hiveTableHandle.getTableName(), + sourceHandle.getName(), target); } @Override @@ -1397,7 +1489,8 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl HiveColumnHandle columnHandle = (HiveColumnHandle) column; MetastoreContext metastoreContext = getMetastoreContext(session); - metastore.dropColumn(metastoreContext, hiveTableHandle.getSchemaName(), hiveTableHandle.getTableName(), columnHandle.getName()); + metastore.dropColumn(metastoreContext, hiveTableHandle.getSchemaName(), hiveTableHandle.getTableName(), + columnHandle.getName()); } private void failIfAvroSchemaIsSet(ConnectorSession session, HiveTableHandle handle) @@ -1417,7 +1510,8 @@ public void renameTable(ConnectorSession session, ConnectorTableHandle tableHand { HiveTableHandle handle = (HiveTableHandle) tableHandle; MetastoreContext metastoreContext = getMetastoreContext(session); - metastore.renameTable(metastoreContext, handle.getSchemaName(), handle.getTableName(), newTableName.getSchemaName(), newTableName.getTableName()); + metastore.renameTable(metastoreContext, handle.getSchemaName(), handle.getTableName(), + newTableName.getSchemaName(), newTableName.getTableName()); } @Override @@ -1431,7 +1525,8 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle throw new TableNotFoundException(handle.getSchemaTableName()); } metastore.dropTable( - new HdfsContext(session, handle.getSchemaName(), handle.getTableName(), target.get().getStorage().getLocation(), false), + new HdfsContext(session, handle.getSchemaName(), handle.getTableName(), + target.get().getStorage().getLocation(), false), handle.getSchemaName(), handle.getTableName()); } @@ -1449,7 +1544,8 @@ public ConnectorTableHandle beginStatisticsCollection(ConnectorSession session, } @Override - public void finishStatisticsCollection(ConnectorSession session, ConnectorTableHandle tableHandle, Collection computedStatistics) + public void finishStatisticsCollection(ConnectorSession session, ConnectorTableHandle tableHandle, + Collection computedStatistics) { HiveTableHandle handle = (HiveTableHandle) tableHandle; SchemaTableName tableName = handle.getSchemaTableName(); @@ -1464,13 +1560,16 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH List hiveColumnHandles = hiveColumnHandles(table); Map columnTypes = hiveColumnHandles.stream() .filter(columnHandle -> !columnHandle.isHidden()) - .collect(toImmutableMap(HiveColumnHandle::getName, column -> column.getHiveType().getType(typeManager))); + .collect( + toImmutableMap(HiveColumnHandle::getName, column -> column.getHiveType().getType(typeManager))); - Map, ComputedStatistics> computedStatisticsMap = createComputedStatisticsToPartitionMap(computedStatistics, partitionColumnNames, columnTypes); + Map, ComputedStatistics> computedStatisticsMap = createComputedStatisticsToPartitionMap( + computedStatistics, partitionColumnNames, columnTypes); if (partitionColumns.isEmpty()) { // commit analyze to unpartitioned table - metastore.setTableStatistics(metastoreContext, table, createPartitionStatistics(session, columnTypes, computedStatisticsMap.get(ImmutableList.of()))); + metastore.setTableStatistics(metastoreContext, table, createPartitionStatistics(session, columnTypes, + computedStatisticsMap.get(ImmutableList.of()))); } else { List> partitionValuesList; @@ -1478,8 +1577,10 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH partitionValuesList = handle.getAnalyzePartitionValues().get(); } else { - partitionValuesList = metastore.getPartitionNames(metastoreContext, handle.getSchemaName(), handle.getTableName()) - .orElseThrow(() -> new TableNotFoundException(((HiveTableHandle) tableHandle).getSchemaTableName())) + partitionValuesList = metastore + .getPartitionNames(metastoreContext, handle.getSchemaName(), handle.getTableName()) + .orElseThrow( + () -> new TableNotFoundException(((HiveTableHandle) tableHandle).getSchemaTableName())) .stream() .map(MetastoreUtil::toPartitionValues) .collect(toImmutableList()); @@ -1489,8 +1590,11 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH Map> columnStatisticTypes = hiveColumnHandles.stream() .filter(columnHandle -> !partitionColumnNames.contains(columnHandle.getName())) .filter(column -> !column.isHidden()) - .collect(toImmutableMap(HiveColumnHandle::getName, column -> ImmutableSet.copyOf(metastore.getSupportedColumnStatistics(metastoreContext, typeManager.getType(column.getTypeSignature()))))); - Supplier emptyPartitionStatistics = Suppliers.memoize(() -> createEmptyPartitionStatistics(columnTypes, columnStatisticTypes)); + .collect(toImmutableMap(HiveColumnHandle::getName, + column -> ImmutableSet.copyOf(metastore.getSupportedColumnStatistics(metastoreContext, + typeManager.getType(column.getTypeSignature()))))); + Supplier emptyPartitionStatistics = Suppliers + .memoize(() -> createEmptyPartitionStatistics(columnTypes, columnStatisticTypes)); int usedComputedStatistics = 0; for (List partitionValues : partitionValuesList) { @@ -1500,7 +1604,8 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH } else { usedComputedStatistics++; - partitionStatistics.put(partitionValues, createPartitionStatistics(session, columnTypes, collectedStatistics)); + partitionStatistics.put(partitionValues, + createPartitionStatistics(session, columnTypes, collectedStatistics)); } } verify(usedComputedStatistics == computedStatistics.size(), "All computed statistics must be used"); @@ -1509,12 +1614,17 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH } @Override - public HiveOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout) + public HiveOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, + Optional layout) { - verifyJvmTimeZone(); + Optional externalLocation = Optional.ofNullable(getExternalLocation(tableMetadata.getProperties())) + .map(HiveMetadata::getExternalLocationAsPath); + if (!createsOfNonManagedTablesEnabled && externalLocation.isPresent()) { + throw new PrestoException(NOT_SUPPORTED, "Creating non-managed Hive tables is disabled"); + } - if (getExternalLocation(tableMetadata.getProperties()) != null) { - throw new PrestoException(NOT_SUPPORTED, "External tables cannot be created using CREATE TABLE AS"); + if (!writesToNonManagedTablesEnabled && externalLocation.isPresent()) { + throw new PrestoException(NOT_SUPPORTED, "Writes to non-managed Hive tables is disabled"); } if (getAvroSchemaUrl(tableMetadata.getProperties()) != null) { @@ -1531,9 +1641,12 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto String schemaName = schemaTableName.getSchemaName(); String tableName = schemaTableName.getTableName(); - Optional tableEncryptionProperties = getTableEncryptionPropertiesFromTableProperties(tableMetadata, tableStorageFormat, partitionedBy); - List columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), typeTranslator); - HiveStorageFormat partitionStorageFormat = isRespectTableFormat(session) ? tableStorageFormat : getHiveStorageFormat(session); + Optional tableEncryptionProperties = getTableEncryptionPropertiesFromTableProperties( + tableMetadata, tableStorageFormat, partitionedBy); + List columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), + typeTranslator); + HiveStorageFormat partitionStorageFormat = isRespectTableFormat(session) ? tableStorageFormat + : getHiveStorageFormat(session); // unpartitioned tables ignore the partition storage format HiveStorageFormat actualStorageFormat = partitionedBy.isEmpty() ? tableStorageFormat : partitionStorageFormat; @@ -1558,9 +1671,11 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto .collect(toList()); checkPartitionTypesSupported(partitionColumns); - LocationHandle locationHandle = locationService.forNewTable(metastore, session, schemaName, tableName, isTempPathRequired(session, bucketProperty, preferredOrderingColumns)); + LocationHandle locationHandle = locationService.forNewTable(metastore, session, schemaName, tableName, + isTempPathRequired(session, bucketProperty, preferredOrderingColumns), externalLocation); - HdfsContext context = new HdfsContext(session, schemaName, tableName, locationHandle.getTargetPath().toString(), true); + HdfsContext context = new HdfsContext(session, schemaName, tableName, locationHandle.getTargetPath().toString(), + true); Map tableProperties = getEmptyTableProperties( tableMetadata, context, @@ -1581,7 +1696,9 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto preferredOrderingColumns, session.getUser(), tableProperties, - encryptionInformationProvider.getWriteEncryptionInformation(session, tableEncryptionProperties, schemaName, tableName)); + encryptionInformationProvider.getWriteEncryptionInformation(session, tableEncryptionProperties, + schemaName, tableName), + externalLocation.isPresent()); WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle); metastore.declareIntentionToWrite( @@ -1597,7 +1714,9 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto } @Override - public Optional finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments, Collection computedStatistics) + public Optional finishCreateTable(ConnectorSession session, + ConnectorOutputTableHandle tableHandle, Collection fragments, + Collection computedStatistics) { HiveOutputTableHandle handle = (HiveOutputTableHandle) tableHandle; @@ -1609,10 +1728,12 @@ public Optional finishCreateTable(ConnectorSession sess EncryptionInformation encryptionInformation = handle.getEncryptionInformation().get(); if (encryptionInformation.getDwrfEncryptionMetadata().isPresent()) { if (handle.getPartitionedBy().isEmpty()) { - tableEncryptionParameters = ImmutableMap.copyOf(encryptionInformation.getDwrfEncryptionMetadata().get().getExtraMetadata()); + tableEncryptionParameters = ImmutableMap + .copyOf(encryptionInformation.getDwrfEncryptionMetadata().get().getExtraMetadata()); } else { - partitionEncryptionParameters = ImmutableMap.copyOf(encryptionInformation.getDwrfEncryptionMetadata().get().getExtraMetadata()); + partitionEncryptionParameters = ImmutableMap + .copyOf(encryptionInformation.getDwrfEncryptionMetadata().get().getExtraMetadata()); } } } @@ -1635,7 +1756,7 @@ public Optional finishCreateTable(ConnectorSession sess .putAll(tableEncryptionParameters) .build(), writeInfo.getTargetPath(), - MANAGED_TABLE, + handle.isExternal() ? EXTERNAL_TABLE : MANAGED_TABLE, prestoVersion, metastoreContext); PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(handle.getTableOwner()); @@ -1648,12 +1769,16 @@ public Optional finishCreateTable(ConnectorSession sess handle, table, partitionUpdates); - // replace partitionUpdates before creating the zero-row files so that those files will be cleaned up if we end up rollback - partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets)); - HdfsContext hdfsContext = new HdfsContext(session, table.getDatabaseName(), table.getTableName(), table.getStorage().getLocation(), true); + // replace partitionUpdates before creating the zero-row files so that those + // files will be cleaned up if we end up rollback + partitionUpdates = PartitionUpdate + .mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets)); + HdfsContext hdfsContext = new HdfsContext(session, table.getDatabaseName(), table.getTableName(), + table.getStorage().getLocation(), true); for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) { - Optional partition = table.getPartitionColumns().isEmpty() ? Optional.empty() : - Optional.of(partitionObjectBuilder.buildPartitionObject(session, table, partitionUpdate, prestoVersion, partitionEncryptionParameters)); + Optional partition = table.getPartitionColumns().isEmpty() ? Optional.empty() + : Optional.of(partitionObjectBuilder.buildPartitionObject(session, table, partitionUpdate, + prestoVersion, partitionEncryptionParameters)); zeroRowFileCreator.createFiles( session, hdfsContext, @@ -1666,8 +1791,10 @@ public Optional finishCreateTable(ConnectorSession sess } Map columnTypes = handle.getInputColumns().stream() - .collect(toImmutableMap(HiveColumnHandle::getName, column -> column.getHiveType().getType(typeManager))); - Map, ComputedStatistics> partitionComputedStatistics = createComputedStatisticsToPartitionMap(computedStatistics, handle.getPartitionedBy(), columnTypes); + .collect( + toImmutableMap(HiveColumnHandle::getName, column -> column.getHiveType().getType(typeManager))); + Map, ComputedStatistics> partitionComputedStatistics = createComputedStatisticsToPartitionMap( + computedStatistics, handle.getPartitionedBy(), columnTypes); PartitionStatistics tableStatistics; if (table.getPartitionColumns().isEmpty()) { @@ -1675,13 +1802,15 @@ public Optional finishCreateTable(ConnectorSession sess .map(PartitionUpdate::getStatistics) .reduce((first, second) -> reduce(first, second, ADD)) .orElse(createZeroStatistics()); - tableStatistics = createPartitionStatistics(session, basicStatistics, columnTypes, getColumnStatistics(partitionComputedStatistics, ImmutableList.of())); + tableStatistics = createPartitionStatistics(session, basicStatistics, columnTypes, + getColumnStatistics(partitionComputedStatistics, ImmutableList.of())); } else { tableStatistics = new PartitionStatistics(createEmptyStatistics(), ImmutableMap.of()); } - metastore.createTable(session, table, principalPrivileges, Optional.of(writeInfo.getWritePath()), false, tableStatistics); + metastore.createTable(session, table, principalPrivileges, Optional.of(writeInfo.getWritePath()), false, + tableStatistics); if (handle.getPartitionedBy().isEmpty()) { return Optional.of(new HiveWrittenPartitions(ImmutableList.of(UNPARTITIONED_ID))); @@ -1693,10 +1822,12 @@ public Optional finishCreateTable(ConnectorSession sess for (PartitionUpdate update : partitionUpdates) { Map partitionParameters = partitionEncryptionParameters; if (isPreferManifestsToListFiles(session) && isFileRenamingEnabled(session)) { - // Store list of file names and sizes in partition metadata when prefer_manifests_to_list_files and file_renaming_enabled are set to true + // Store list of file names and sizes in partition metadata when + // prefer_manifests_to_list_files and file_renaming_enabled are set to true partitionParameters = updatePartitionMetadataWithFileNamesAndSizes(update, partitionParameters); } - Partition partition = partitionObjectBuilder.buildPartitionObject(session, table, update, prestoVersion, partitionParameters); + Partition partition = partitionObjectBuilder.buildPartitionObject(session, table, update, prestoVersion, + partitionParameters); PartitionStatistics partitionStatistics = createPartitionStatistics( session, update.getStatistics(), @@ -1708,7 +1839,8 @@ public Optional finishCreateTable(ConnectorSession sess handle.getTableName(), table.getStorage().getLocation(), true, - partitionObjectBuilder.buildPartitionObject(session, table, update, prestoVersion, partitionParameters), + partitionObjectBuilder.buildPartitionObject(session, table, update, prestoVersion, + partitionParameters), update.getWritePath(), partitionStatistics); } @@ -1726,7 +1858,9 @@ public static boolean shouldCreateFilesForMissingBuckets(Table table, ConnectorS private MetastoreContext getMetastoreContext(ConnectorSession session) { - return new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource(), getMetastoreHeaders(session), isUserDefinedTypeEncodingEnabled(session), metastore.getColumnConverterProvider()); + return new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), + session.getSource(), getMetastoreHeaders(session), isUserDefinedTypeEncodingEnabled(session), + metastore.getColumnConverterProvider()); } private Column columnHandleToColumn(ConnectorSession session, HiveColumnHandle handle) @@ -1741,7 +1875,8 @@ private Properties getSchema(Optional partition, Table table) private StorageFormat getStorageFormat(Optional partition, Table table) { - return partition.isPresent() ? partition.get().getStorage().getStorageFormat() : table.getStorage().getStorageFormat(); + return partition.isPresent() ? partition.get().getStorage().getStorageFormat() + : table.getStorage().getStorageFormat(); } private List computePartitionUpdatesForMissingBuckets( @@ -1754,7 +1889,8 @@ private List computePartitionUpdatesForMissingBuckets( if (!shouldCreateFilesForMissingBuckets(table, session)) { return ImmutableList.of(); } - HiveStorageFormat storageFormat = table.getPartitionColumns().isEmpty() ? handle.getTableStorageFormat() : handle.getPartitionStorageFormat(); + HiveStorageFormat storageFormat = table.getPartitionColumns().isEmpty() ? handle.getTableStorageFormat() + : handle.getPartitionStorageFormat(); // empty un-partitioned bucketed table if (table.getPartitionColumns().isEmpty() && partitionUpdates.isEmpty()) { @@ -1820,7 +1956,8 @@ private List computeFileNamesForMissingBuckets( String fileExtension = getFileExtension(fromHiveStorageFormat(storageFormat), compressionCodec); ImmutableList.Builder missingFileNamesBuilder = ImmutableList.builder(); for (int i = 0; i < bucketCount; i++) { - String targetFileName = isFileRenamingEnabled(session) ? String.valueOf(i) : computeBucketedFileName(session.getQueryId(), i) + fileExtension; + String targetFileName = isFileRenamingEnabled(session) ? String.valueOf(i) + : computeBucketedFileName(session.getQueryId(), i) + fileExtension; if (!existingFileNames.contains(targetFileName)) { missingFileNamesBuilder.add(targetFileName); } @@ -1852,7 +1989,8 @@ private HiveInsertTableHandle beginInsertInternal(ConnectorSession session, Conn if (!isWritableType(column.getType())) { throw new PrestoException( NOT_SUPPORTED, - format("Inserting into Hive table %s with column type %s not supported", tableName, column.getType())); + format("Inserting into Hive table %s with column type %s not supported", tableName, + column.getType())); } } @@ -1874,10 +2012,13 @@ private HiveInsertTableHandle beginInsertInternal(ConnectorSession session, Conn locationHandle = locationService.forExistingTable(metastore, session, table, tempPathRequired); } - Optional tableEncryptionProperties = getTableEncryptionPropertiesFromHiveProperties(table.getParameters(), tableStorageFormat); + Optional tableEncryptionProperties = getTableEncryptionPropertiesFromHiveProperties( + table.getParameters(), tableStorageFormat); - HiveStorageFormat partitionStorageFormat = isRespectTableFormat(session) ? tableStorageFormat : getHiveStorageFormat(session); - HiveStorageFormat actualStorageFormat = table.getPartitionColumns().isEmpty() ? tableStorageFormat : partitionStorageFormat; + HiveStorageFormat partitionStorageFormat = isRespectTableFormat(session) ? tableStorageFormat + : getHiveStorageFormat(session); + HiveStorageFormat actualStorageFormat = table.getPartitionColumns().isEmpty() ? tableStorageFormat + : partitionStorageFormat; if (tableEncryptionProperties.isPresent() && actualStorageFormat != tableStorageFormat) { throw new PrestoException( @@ -1902,11 +2043,14 @@ private HiveInsertTableHandle beginInsertInternal(ConnectorSession session, Conn partitionStorageFormat, actualStorageFormat, getHiveCompressionCodec(session, isTemporaryTable, actualStorageFormat), - encryptionInformationProvider.getWriteEncryptionInformation(session, tableEncryptionProperties.map(identity()), tableName.getSchemaName(), tableName.getTableName())); + encryptionInformationProvider.getWriteEncryptionInformation(session, + tableEncryptionProperties.map(identity()), tableName.getSchemaName(), + tableName.getTableName())); WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle); metastore.declareIntentionToWrite( - new HdfsContext(session, tableName.getSchemaName(), tableName.getTableName(), table.getStorage().getLocation(), false), + new HdfsContext(session, tableName.getSchemaName(), tableName.getTableName(), + table.getStorage().getLocation(), false), metastoreContext, writeInfo.getWriteMode(), writeInfo.getWritePath(), @@ -1916,7 +2060,8 @@ private HiveInsertTableHandle beginInsertInternal(ConnectorSession session, Conn return result; } - private HiveCompressionCodec getHiveCompressionCodec(ConnectorSession session, boolean isTemporaryTable, HiveStorageFormat storageFormat) + private HiveCompressionCodec getHiveCompressionCodec(ConnectorSession session, boolean isTemporaryTable, + HiveStorageFormat storageFormat) { if (isTemporaryTable) { return getTemporaryTableCompressionCodec(session); @@ -1928,12 +2073,16 @@ private HiveCompressionCodec getHiveCompressionCodec(ConnectorSession session, b } @Override - public Optional finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection fragments, Collection computedStatistics) + public Optional finishInsert(ConnectorSession session, + ConnectorInsertTableHandle insertHandle, Collection fragments, + Collection computedStatistics) { return finishInsertInternal(session, insertHandle, fragments, computedStatistics); } - private Optional finishInsertInternal(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection fragments, Collection computedStatistics) + private Optional finishInsertInternal(ConnectorSession session, + ConnectorInsertTableHandle insertHandle, Collection fragments, + Collection computedStatistics) { HiveInsertTableHandle handle = (HiveInsertTableHandle) insertHandle; @@ -1946,7 +2095,8 @@ private Optional finishInsertInternal(ConnectorSession Table table = metastore.getTable(metastoreContext, handle.getSchemaName(), handle.getTableName()) .orElseThrow(() -> new TableNotFoundException(handle.getSchemaTableName())); - if (!table.getStorage().getStorageFormat().getInputFormat().equals(tableStorageFormat.getInputFormat()) && isRespectTableFormat(session)) { + if (!table.getStorage().getStorageFormat().getInputFormat().equals(tableStorageFormat.getInputFormat()) + && isRespectTableFormat(session)) { throw new PrestoException(HIVE_CONCURRENT_MODIFICATION_DETECTED, "Table format changed during insert"); } @@ -1956,19 +2106,24 @@ private Optional finishInsertInternal(ConnectorSession handle, table, partitionUpdates); - // replace partitionUpdates before creating the zero-row files so that those files will be cleaned up if we end up rollback - partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets)); - HdfsContext hdfsContext = new HdfsContext(session, table.getDatabaseName(), table.getTableName(), table.getStorage().getLocation(), false); + // replace partitionUpdates before creating the zero-row files so that those + // files will be cleaned up if we end up rollback + partitionUpdates = PartitionUpdate + .mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets)); + HdfsContext hdfsContext = new HdfsContext(session, table.getDatabaseName(), table.getTableName(), + table.getStorage().getLocation(), false); for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) { - Optional partition = table.getPartitionColumns().isEmpty() ? Optional.empty() : - Optional.of(partitionObjectBuilder.buildPartitionObject( - session, - table, - partitionUpdate, - prestoVersion, - handle.getEncryptionInformation() - .map(encryptionInfo -> encryptionInfo.getDwrfEncryptionMetadata().map(DwrfEncryptionMetadata::getExtraMetadata).orElseGet(ImmutableMap::of)) - .orElseGet(ImmutableMap::of))); + Optional partition = table.getPartitionColumns().isEmpty() ? Optional.empty() + : Optional.of(partitionObjectBuilder.buildPartitionObject( + session, + table, + partitionUpdate, + prestoVersion, + handle.getEncryptionInformation() + .map(encryptionInfo -> encryptionInfo.getDwrfEncryptionMetadata() + .map(DwrfEncryptionMetadata::getExtraMetadata) + .orElseGet(ImmutableMap::of)) + .orElseGet(ImmutableMap::of))); zeroRowFileCreator.createFiles( session, hdfsContext, @@ -1984,15 +2139,19 @@ private Optional finishInsertInternal(ConnectorSession .map(Column::getName) .collect(toImmutableList()); Map columnTypes = handle.getInputColumns().stream() - .collect(toImmutableMap(HiveColumnHandle::getName, column -> column.getHiveType().getType(typeManager))); - Map, ComputedStatistics> partitionComputedStatistics = createComputedStatisticsToPartitionMap(computedStatistics, partitionedBy, columnTypes); + .collect( + toImmutableMap(HiveColumnHandle::getName, column -> column.getHiveType().getType(typeManager))); + Map, ComputedStatistics> partitionComputedStatistics = createComputedStatisticsToPartitionMap( + computedStatistics, partitionedBy, columnTypes); - Set existingPartitions = getExistingPartitionNames(session.getIdentity(), metastoreContext, handle.getSchemaName(), handle.getTableName(), partitionUpdates); + Set existingPartitions = getExistingPartitionNames(session.getIdentity(), metastoreContext, + handle.getSchemaName(), handle.getTableName(), partitionUpdates); for (PartitionUpdate partitionUpdate : partitionUpdates) { if (partitionUpdate.getName().isEmpty()) { if (handle.getEncryptionInformation().isPresent()) { - throw new PrestoException(HIVE_UNSUPPORTED_ENCRYPTION_OPERATION, "Inserting into an existing table with encryption enabled is not supported yet"); + throw new PrestoException(HIVE_UNSUPPORTED_ENCRYPTION_OPERATION, + "Inserting into an existing table with encryption enabled is not supported yet"); } // insert into unpartitioned table @@ -2011,7 +2170,8 @@ private Optional finishInsertInternal(ConnectorSession } else if (partitionUpdate.getUpdateMode() == APPEND) { if (handle.getEncryptionInformation().isPresent()) { - throw new PrestoException(HIVE_UNSUPPORTED_ENCRYPTION_OPERATION, "Inserting into an existing partition with encryption enabled is not supported yet"); + throw new PrestoException(HIVE_UNSUPPORTED_ENCRYPTION_OPERATION, + "Inserting into an existing partition with encryption enabled is not supported yet"); } // insert into existing partition List partitionValues = toPartitionValues(partitionUpdate.getName()); @@ -2032,16 +2192,20 @@ else if (partitionUpdate.getUpdateMode() == APPEND) { } else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode() == OVERWRITE) { Map extraPartitionMetadata = handle.getEncryptionInformation() - .map(encryptionInfo -> encryptionInfo.getDwrfEncryptionMetadata().map(DwrfEncryptionMetadata::getExtraMetadata).orElseGet(ImmutableMap::of)) + .map(encryptionInfo -> encryptionInfo.getDwrfEncryptionMetadata() + .map(DwrfEncryptionMetadata::getExtraMetadata).orElseGet(ImmutableMap::of)) .orElseGet(ImmutableMap::of); if (isPreferManifestsToListFiles(session) && isFileRenamingEnabled(session)) { - // Store list of file names and sizes in partition metadata when prefer_manifests_to_list_files and file_renaming_enabled are set to true - extraPartitionMetadata = updatePartitionMetadataWithFileNamesAndSizes(partitionUpdate, extraPartitionMetadata); + // Store list of file names and sizes in partition metadata when + // prefer_manifests_to_list_files and file_renaming_enabled are set to true + extraPartitionMetadata = updatePartitionMetadataWithFileNamesAndSizes(partitionUpdate, + extraPartitionMetadata); } // Track the manifest blob size - getManifestSizeInBytes(session, partitionUpdate, extraPartitionMetadata).ifPresent(hivePartitionStats::addManifestSizeInBytes); + getManifestSizeInBytes(session, partitionUpdate, extraPartitionMetadata) + .ifPresent(hivePartitionStats::addManifestSizeInBytes); // insert into new partition or overwrite existing partition Partition partition = partitionObjectBuilder.buildPartitionObject( @@ -2050,15 +2214,19 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode partitionUpdate, prestoVersion, extraPartitionMetadata); - if (!partition.getStorage().getStorageFormat().getInputFormat().equals(handle.getPartitionStorageFormat().getInputFormat()) && isRespectTableFormat(session)) { - throw new PrestoException(HIVE_CONCURRENT_MODIFICATION_DETECTED, "Partition format changed during insert"); + if (!partition.getStorage().getStorageFormat().getInputFormat() + .equals(handle.getPartitionStorageFormat().getInputFormat()) && isRespectTableFormat(session)) { + throw new PrestoException(HIVE_CONCURRENT_MODIFICATION_DETECTED, + "Partition format changed during insert"); } if (existingPartitions.contains(partitionUpdate.getName())) { if (partitionUpdate.getUpdateMode() == OVERWRITE) { - metastore.dropPartition(session, handle.getSchemaName(), handle.getTableName(), handle.getLocationHandle().getTargetPath().toString(), partition.getValues()); + metastore.dropPartition(session, handle.getSchemaName(), handle.getTableName(), + handle.getLocationHandle().getTargetPath().toString(), partition.getValues()); } else { - throw new PrestoException(HIVE_PARTITION_READ_ONLY, "Cannot insert into an existing partition of Hive table: " + partitionUpdate.getName()); + throw new PrestoException(HIVE_PARTITION_READ_ONLY, + "Cannot insert into an existing partition of Hive table: " + partitionUpdate.getName()); } } PartitionStatistics partitionStatistics = createPartitionStatistics( @@ -2077,7 +2245,8 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode partitionStatistics); } else { - throw new IllegalArgumentException(format("Unsupported update mode: %s", partitionUpdate.getUpdateMode())); + throw new IllegalArgumentException( + format("Unsupported update mode: %s", partitionUpdate.getUpdateMode())); } } @@ -2088,9 +2257,11 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode .collect(toList()))); } - private static boolean isTempPathRequired(ConnectorSession session, Optional bucketProperty, List preferredOrderingColumns) + private static boolean isTempPathRequired(ConnectorSession session, Optional bucketProperty, + List preferredOrderingColumns) { - boolean hasSortedWrite = bucketProperty.map(property -> !property.getSortedBy().isEmpty()).orElse(false) || !preferredOrderingColumns.isEmpty(); + boolean hasSortedWrite = bucketProperty.map(property -> !property.getSortedBy().isEmpty()).orElse(false) + || !preferredOrderingColumns.isEmpty(); return isSortedWriteToTempPathEnabled(session) && hasSortedWrite; } @@ -2112,7 +2283,8 @@ private PartitionStatistics createPartitionStatistics( .orElseThrow(() -> new VerifyException("rowCount not present")); verify(!rowCountBlock.isNull(0), "rowCount must never be null"); long rowCount = BIGINT.getLong(rowCountBlock, 0); - HiveBasicStatistics rowCountOnlyBasicStatistics = new HiveBasicStatistics(OptionalLong.empty(), OptionalLong.of(rowCount), OptionalLong.empty(), OptionalLong.empty()); + HiveBasicStatistics rowCountOnlyBasicStatistics = new HiveBasicStatistics(OptionalLong.empty(), + OptionalLong.of(rowCount), OptionalLong.empty(), OptionalLong.empty()); return createPartitionStatistics(session, rowCountOnlyBasicStatistics, columnTypes, computedColumnStatistics); } @@ -2122,7 +2294,8 @@ private PartitionStatistics createPartitionStatistics( Map columnTypes, Map computedColumnStatistics) { - long rowCount = basicStatistics.getRowCount().orElseThrow(() -> new IllegalArgumentException("rowCount not present")); + long rowCount = basicStatistics.getRowCount() + .orElseThrow(() -> new IllegalArgumentException("rowCount not present")); Map columnStatistics = fromComputedStatistics( session, timeZone, @@ -2132,14 +2305,16 @@ private PartitionStatistics createPartitionStatistics( return new PartitionStatistics(basicStatistics, columnStatistics); } - private static Map getColumnStatistics(Map, ComputedStatistics> statistics, List partitionValues) + private static Map getColumnStatistics( + Map, ComputedStatistics> statistics, List partitionValues) { return Optional.ofNullable(statistics.get(partitionValues)) .map(ComputedStatistics::getColumnStatistics) .orElse(ImmutableMap.of()); } - private Set getExistingPartitionNames(ConnectorIdentity identity, MetastoreContext metastoreContext, String databaseName, String tableName, List partitionUpdates) + private Set getExistingPartitionNames(ConnectorIdentity identity, MetastoreContext metastoreContext, + String databaseName, String tableName, List partitionUpdates) { ImmutableSet.Builder existingPartitions = ImmutableSet.builder(); ImmutableSet.Builder potentiallyNewPartitions = ImmutableSet.builder(); @@ -2158,9 +2333,12 @@ private Set getExistingPartitionNames(ConnectorIdentity identity, Metast } } - // try to load potentially new partitions in batches to check if any of them exist + // try to load potentially new partitions in batches to check if any of them + // exist Lists.partition(ImmutableList.copyOf(potentiallyNewPartitions.build()), maxPartitionBatchSize).stream() - .flatMap(partitionNames -> metastore.getPartitionsByNames(metastoreContext, databaseName, tableName, partitionNames).entrySet().stream() + .flatMap(partitionNames -> metastore + .getPartitionsByNames(metastoreContext, databaseName, tableName, partitionNames).entrySet() + .stream() .filter(entry -> entry.getValue().isPresent()) .map(Map.Entry::getKey)) .forEach(existingPartitions::add); @@ -2169,7 +2347,8 @@ private Set getExistingPartitionNames(ConnectorIdentity identity, Metast } @Override - public void createView(ConnectorSession session, ConnectorTableMetadata viewMetadata, String viewData, boolean replace) + public void createView(ConnectorSession session, ConnectorTableMetadata viewMetadata, String viewData, + boolean replace) { Map properties = ImmutableMap.builder() .put(TABLE_COMMENT, "Presto View") @@ -2185,7 +2364,8 @@ public void createView(ConnectorSession session, ConnectorTableMetadata viewMeta for (ColumnMetadata column : viewMetadata.getColumns()) { try { HiveType hiveType = toHiveType(typeTranslator, column.getType()); - columns.add(new Column(column.getName(), hiveType, Optional.ofNullable(column.getComment()), columnConverter.getTypeMetadata(hiveType, column.getType().getTypeSignature()))); + columns.add(new Column(column.getName(), hiveType, Optional.ofNullable(column.getComment()), + columnConverter.getTypeMetadata(hiveType, column.getType().getTypeSignature()))); } catch (PrestoException e) { // if a view uses any unsupported hive types, include only a dummy column value @@ -2194,7 +2374,8 @@ public void createView(ConnectorSession session, ConnectorTableMetadata viewMeta new Column( "dummy", HIVE_STRING, - Optional.of(format("Using dummy because column %s uses unsupported Hive type %s ", column.getName(), column.getType())), + Optional.of(format("Using dummy because column %s uses unsupported Hive type %s ", + column.getName(), column.getType())), Optional.empty())); break; } @@ -2223,18 +2404,21 @@ public void createView(ConnectorSession session, ConnectorTableMetadata viewMeta Table table = tableBuilder.build(); PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(session.getUser()); - Optional
existing = metastore.getTable(metastoreContext, viewName.getSchemaName(), viewName.getTableName()); + Optional
existing = metastore.getTable(metastoreContext, viewName.getSchemaName(), + viewName.getTableName()); if (existing.isPresent()) { if (!replace || !MetastoreUtil.isPrestoView(existing.get())) { throw new ViewAlreadyExistsException(viewName); } - metastore.replaceView(metastoreContext, viewName.getSchemaName(), viewName.getTableName(), table, principalPrivileges); + metastore.replaceView(metastoreContext, viewName.getSchemaName(), viewName.getTableName(), table, + principalPrivileges); return; } try { - metastore.createTable(session, table, principalPrivileges, Optional.empty(), false, new PartitionStatistics(createEmptyStatistics(), ImmutableMap.of())); + metastore.createTable(session, table, principalPrivileges, Optional.empty(), false, + new PartitionStatistics(createEmptyStatistics(), ImmutableMap.of())); } catch (TableAlreadyExistsException e) { throw new ViewAlreadyExistsException(e.getTableName()); @@ -2287,7 +2471,8 @@ public Map getViews(ConnectorSession s MetastoreContext metastoreContext = getMetastoreContext(session); for (SchemaTableName schemaTableName : tableNames) { - Optional
table = metastore.getTable(metastoreContext, schemaTableName.getSchemaName(), schemaTableName.getTableName()); + Optional
table = metastore.getTable(metastoreContext, schemaTableName.getSchemaName(), + schemaTableName.getTableName()); if (table.isPresent() && MetastoreUtil.isPrestoView(table.get())) { views.put(schemaTableName, new ConnectorViewDefinition( schemaTableName, @@ -2300,7 +2485,8 @@ public Map getViews(ConnectorSession s } @Override - public Optional getMaterializedView(ConnectorSession session, SchemaTableName viewName) + public Optional getMaterializedView(ConnectorSession session, + SchemaTableName viewName) { requireNonNull(viewName, "viewName is null"); @@ -2309,7 +2495,8 @@ public Optional getMaterializedView(Connect if (table.isPresent() && MetastoreUtil.isPrestoMaterializedView(table.get())) { try { - return Optional.of(MATERIALIZED_VIEW_JSON_CODEC.fromJson(decodeMaterializedViewData(table.get().getViewOriginalText().get()))); + return Optional.of(MATERIALIZED_VIEW_JSON_CODEC + .fromJson(decodeMaterializedViewData(table.get().getViewOriginalText().get()))); } catch (IllegalArgumentException e) { throw new PrestoException(INVALID_VIEW, "Invalid materialized view JSON", e); @@ -2320,14 +2507,16 @@ public Optional getMaterializedView(Connect } @Override - public MaterializedViewStatus getMaterializedViewStatus(ConnectorSession session, SchemaTableName materializedViewName) + public MaterializedViewStatus getMaterializedViewStatus(ConnectorSession session, + SchemaTableName materializedViewName) { MetastoreContext metastoreContext = getMetastoreContext(session); ConnectorMaterializedViewDefinition viewDefinition = getMaterializedView(session, materializedViewName) .orElseThrow(() -> new MaterializedViewNotFoundException(materializedViewName)); List
baseTables = viewDefinition.getBaseTables().stream() - .map(baseTableName -> metastore.getTable(metastoreContext, baseTableName.getSchemaName(), baseTableName.getTableName()) + .map(baseTableName -> metastore + .getTable(metastoreContext, baseTableName.getSchemaName(), baseTableName.getTableName()) .orElseThrow(() -> new TableNotFoundException(baseTableName))) .collect(toImmutableList()); @@ -2335,7 +2524,8 @@ public MaterializedViewStatus getMaterializedViewStatus(ConnectorSession session table.getTableType().equals(MANAGED_TABLE), format("base table %s is not a managed table", table.getTableName()))); - Table materializedViewTable = metastore.getTable(metastoreContext, materializedViewName.getSchemaName(), materializedViewName.getTableName()) + Table materializedViewTable = metastore + .getTable(metastoreContext, materializedViewName.getSchemaName(), materializedViewName.getTableName()) .orElseThrow(() -> new MaterializedViewNotFoundException(materializedViewName)); checkState( @@ -2344,22 +2534,28 @@ public MaterializedViewStatus getMaterializedViewStatus(ConnectorSession session validateMaterializedViewPartitionColumns(metastore, metastoreContext, materializedViewTable, viewDefinition); Map> directColumnMappings = viewDefinition.getDirectColumnMappingsAsMap(); - Map> viewToBasePartitionMap = getViewToBasePartitionMap(materializedViewTable, baseTables, directColumnMappings); + Map> viewToBasePartitionMap = getViewToBasePartitionMap( + materializedViewTable, baseTables, directColumnMappings); - MaterializedDataPredicates materializedDataPredicates = getMaterializedDataPredicates(metastore, metastoreContext, typeManager, materializedViewTable, timeZone); + MaterializedDataPredicates materializedDataPredicates = getMaterializedDataPredicates(metastore, + metastoreContext, typeManager, materializedViewTable, timeZone); if (materializedDataPredicates.getPredicateDisjuncts().isEmpty()) { return new MaterializedViewStatus(NOT_MATERIALIZED); } - // Partitions to keep track of for materialized view freshness are the partitions of every base table + // Partitions to keep track of for materialized view freshness are the + // partitions of every base table // that are not available/updated to the materialized view yet. Map partitionsFromBaseTables = baseTables.stream() .collect(toImmutableMap( baseTable -> new SchemaTableName(baseTable.getDatabaseName(), baseTable.getTableName()), baseTable -> { - MaterializedDataPredicates baseTableMaterializedPredicates = getMaterializedDataPredicates(metastore, metastoreContext, typeManager, baseTable, timeZone); - SchemaTableName schemaTableName = new SchemaTableName(baseTable.getDatabaseName(), baseTable.getTableName()); - Map viewToBaseIndirectMappedColumns = viewToBaseTableOnOuterJoinSideIndirectMappedPartitions(viewDefinition, baseTable).orElse(ImmutableMap.of()); + MaterializedDataPredicates baseTableMaterializedPredicates = getMaterializedDataPredicates( + metastore, metastoreContext, typeManager, baseTable, timeZone); + SchemaTableName schemaTableName = new SchemaTableName(baseTable.getDatabaseName(), + baseTable.getTableName()); + Map viewToBaseIndirectMappedColumns = viewToBaseTableOnOuterJoinSideIndirectMappedPartitions( + viewDefinition, baseTable).orElse(ImmutableMap.of()); return differenceDataPredicates( baseTableMaterializedPredicates, @@ -2371,7 +2567,9 @@ public MaterializedViewStatus getMaterializedViewStatus(ConnectorSession session for (MaterializedDataPredicates dataPredicates : partitionsFromBaseTables.values()) { if (!dataPredicates.getPredicateDisjuncts().isEmpty()) { if (dataPredicates.getPredicateDisjuncts().stream() - .mapToInt(tupleDomain -> tupleDomain.getDomains().isPresent() ? tupleDomain.getDomains().get().size() : 0) + .mapToInt(tupleDomain -> tupleDomain.getDomains().isPresent() + ? tupleDomain.getDomains().get().size() + : 0) .sum() > HiveSessionProperties.getMaterializedViewMissingPartitionsThreshold(session)) { return new MaterializedViewStatus(TOO_MANY_PARTITIONS_MISSING, partitionsFromBaseTables); } @@ -2384,10 +2582,12 @@ public MaterializedViewStatus getMaterializedViewStatus(ConnectorSession session } @Override - public void createMaterializedView(ConnectorSession session, ConnectorTableMetadata viewMetadata, ConnectorMaterializedViewDefinition viewDefinition, boolean ignoreExisting) + public void createMaterializedView(ConnectorSession session, ConnectorTableMetadata viewMetadata, + ConnectorMaterializedViewDefinition viewDefinition, boolean ignoreExisting) { if (isExternalTable(viewMetadata.getProperties())) { - throw new PrestoException(INVALID_TABLE_PROPERTY, "Specifying external location for materialized view is not supported."); + throw new PrestoException(INVALID_TABLE_PROPERTY, + "Specifying external location for materialized view is not supported."); } Table basicTable = prepareTable(session, viewMetadata, MATERIALIZED_VIEW); @@ -2406,7 +2606,8 @@ public void createMaterializedView(ConnectorSession session, ConnectorTableMetad .build(); Table viewTable = Table.builder(basicTable) .setParameters(parameters) - .setViewOriginalText(Optional.of(encodeMaterializedViewData(MATERIALIZED_VIEW_JSON_CODEC.toJson(viewDefinition)))) + .setViewOriginalText( + Optional.of(encodeMaterializedViewData(MATERIALIZED_VIEW_JSON_CODEC.toJson(viewDefinition)))) .setViewExpandedText(Optional.of("/* Presto Materialized View */")) .build(); @@ -2448,28 +2649,34 @@ public void dropMaterializedView(ConnectorSession session, SchemaTableName viewN } @Override - public HiveInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle) + public HiveInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, + ConnectorTableHandle tableHandle) { return beginInsertInternal(session, tableHandle); } @Override - public Optional finishRefreshMaterializedView(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection fragments, Collection computedStatistics) + public Optional finishRefreshMaterializedView(ConnectorSession session, + ConnectorInsertTableHandle insertHandle, Collection fragments, + Collection computedStatistics) { return finishInsertInternal(session, insertHandle, fragments, computedStatistics); } @Override - public Optional> getReferencedMaterializedViews(ConnectorSession session, SchemaTableName tableName) + public Optional> getReferencedMaterializedViews(ConnectorSession session, + SchemaTableName tableName) { requireNonNull(tableName, "tableName is null"); MetastoreContext metastoreContext = getMetastoreContext(session); - Table table = metastore.getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName()).orElseThrow(() -> new TableNotFoundException(tableName)); + Table table = metastore.getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName()) + .orElseThrow(() -> new TableNotFoundException(tableName)); if (!table.getTableType().equals(MANAGED_TABLE) || MetastoreUtil.isPrestoMaterializedView(table)) { return Optional.empty(); } ImmutableList.Builder materializedViews = ImmutableList.builder(); - for (String viewName : Splitter.on(",").trimResults().omitEmptyStrings().splitToList(table.getParameters().getOrDefault(REFERENCED_MATERIALIZED_VIEWS, ""))) { + for (String viewName : Splitter.on(",").trimResults().omitEmptyStrings() + .splitToList(table.getParameters().getOrDefault(REFERENCED_MATERIALIZED_VIEWS, ""))) { materializedViews.add(SchemaTableName.valueOf(viewName)); } return Optional.of(materializedViews.build()); @@ -2478,7 +2685,8 @@ public Optional> getReferencedMaterializedViews(ConnectorS @Override public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle) { - throw new PrestoException(NOT_SUPPORTED, "This connector only supports delete where one or more partitions are deleted entirely"); + throw new PrestoException(NOT_SUPPORTED, + "This connector only supports delete where one or more partitions are deleted entirely"); } @Override @@ -2488,7 +2696,8 @@ public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, Connect } @Override - public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandle tableHandle, ConnectorTableLayoutHandle tableLayoutHandle) + public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandle tableHandle, + ConnectorTableLayoutHandle tableLayoutHandle) { HiveTableHandle handle = (HiveTableHandle) tableHandle; HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) tableLayoutHandle; @@ -2504,14 +2713,16 @@ public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandl } else { for (HivePartition hivePartition : getOrComputePartitions(layoutHandle, session, tableHandle)) { - metastore.dropPartition(session, handle.getSchemaName(), handle.getTableName(), table.get().getStorage().getLocation(), toPartitionValues(hivePartition.getPartitionId())); + metastore.dropPartition(session, handle.getSchemaName(), handle.getTableName(), + table.get().getStorage().getLocation(), toPartitionValues(hivePartition.getPartitionId())); } } // it is too expensive to determine the exact number of deleted rows return OptionalLong.empty(); } - private List getOrComputePartitions(HiveTableLayoutHandle layoutHandle, ConnectorSession session, ConnectorTableHandle tableHandle) + private List getOrComputePartitions(HiveTableLayoutHandle layoutHandle, ConnectorSession session, + ConnectorTableHandle tableHandle) { if (layoutHandle.getPartitions().isPresent()) { return layoutHandle.getPartitions().get(); @@ -2519,8 +2730,10 @@ private List getOrComputePartitions(HiveTableLayoutHandle layoutH else { TupleDomain partitionColumnPredicate = layoutHandle.getPartitionColumnPredicate(); Predicate> predicate = convertToPredicate(partitionColumnPredicate); - List tableLayoutResults = getTableLayouts(session, tableHandle, new Constraint<>(partitionColumnPredicate, predicate), Optional.empty()); - return ((HiveTableLayoutHandle) Iterables.getOnlyElement(tableLayoutResults).getTableLayout().getHandle()).getPartitions().get(); + List tableLayoutResults = getTableLayouts(session, tableHandle, + new Constraint<>(partitionColumnPredicate, predicate), Optional.empty()); + return ((HiveTableLayoutHandle) Iterables.getOnlyElement(tableLayoutResults).getTableLayout().getHandle()) + .getPartitions().get(); } } @@ -2531,7 +2744,8 @@ static Predicate> convertToPredicate(TupleDomai } @Override - public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHandle tableHandle, Optional tableLayoutHandle) + public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHandle tableHandle, + Optional tableLayoutHandle) { if (!tableLayoutHandle.isPresent()) { return true; @@ -2591,25 +2805,31 @@ private String createTableLayoutString( .omitNullValues() .add("buckets", bucketHandle.map(HiveBucketHandle::getReadBucketCount).orElse(null)) .add("bucketsToKeep", bucketFilter.map(HiveBucketFilter::getBucketsToKeep).orElse(null)) - .add("filter", TRUE_CONSTANT.equals(remainingPredicate) ? null : rowExpressionService.formatRowExpression(session, remainingPredicate)) - .add("domains", domainPredicate.isAll() ? null : domainPredicate.toString(session.getSqlFunctionProperties())) + .add("filter", + TRUE_CONSTANT.equals(remainingPredicate) ? null + : rowExpressionService.formatRowExpression(session, remainingPredicate)) + .add("domains", + domainPredicate.isAll() ? null : domainPredicate.toString(session.getSqlFunctionProperties())) .toString(); } @Override - public List getTableLayouts(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint constraint, Optional> desiredColumns) + public List getTableLayouts(ConnectorSession session, ConnectorTableHandle tableHandle, + Constraint constraint, Optional> desiredColumns) { HiveTableHandle handle = (HiveTableHandle) tableHandle; HivePartitionResult hivePartitionResult; if (handle.getAnalyzePartitionValues().isPresent()) { verify(constraint.getSummary().isAll(), "There shouldn't be any constraint for ANALYZE operation"); - hivePartitionResult = partitionManager.getPartitions(metastore, tableHandle, handle.getAnalyzePartitionValues().get(), session); + hivePartitionResult = partitionManager.getPartitions(metastore, tableHandle, + handle.getAnalyzePartitionValues().get(), session); } else { hivePartitionResult = partitionManager.getPartitions(metastore, tableHandle, constraint, session); } - Map predicateColumns = hivePartitionResult.getEffectivePredicate().getDomains().get().keySet().stream() + Map predicateColumns = hivePartitionResult.getEffectivePredicate().getDomains().get() + .keySet().stream() .map(HiveColumnHandle.class::cast) .collect(toImmutableMap(HiveColumnHandle::getName, Functions.identity())); @@ -2619,7 +2839,8 @@ public List getTableLayouts(ConnectorSession session hiveBucketHandle = Optional.of(createVirtualBucketHandle(virtualBucketCount)); } - TupleDomain domainPredicate = hivePartitionResult.getEffectivePredicate().transform(HiveMetadata::toSubfield); + TupleDomain domainPredicate = hivePartitionResult.getEffectivePredicate() + .transform(HiveMetadata::toSubfield); Table table = metastore.getTable( getMetastoreContext(session), handle.getSchemaTableName().getSchemaName(), @@ -2643,8 +2864,11 @@ public List getTableLayouts(ConnectorSession session hiveBucketHandle, hivePartitionResult.getBucketFilter(), false, - createTableLayoutString(session, handle.getSchemaTableName(), hivePartitionResult.getBucketHandle(), hivePartitionResult.getBucketFilter(), TRUE_CONSTANT, domainPredicate), - desiredColumns.map(columns -> columns.stream().map(column -> (HiveColumnHandle) column).collect(toImmutableSet())), + createTableLayoutString(session, handle.getSchemaTableName(), + hivePartitionResult.getBucketHandle(), hivePartitionResult.getBucketFilter(), + TRUE_CONSTANT, domainPredicate), + desiredColumns.map(columns -> columns.stream().map(column -> (HiveColumnHandle) column) + .collect(toImmutableSet())), false)), hivePartitionResult.getUnenforcedConstraint())); } @@ -2663,8 +2887,10 @@ private boolean isPushdownFilterEnabled(ConnectorSession session, ConnectorTable { boolean pushdownFilterEnabled = HiveSessionProperties.isPushdownFilterEnabled(session); if (pushdownFilterEnabled) { - HiveStorageFormat hiveStorageFormat = getHiveStorageFormat(getTableMetadata(session, tableHandle).getProperties()); - if (hiveStorageFormat == ORC || hiveStorageFormat == DWRF || hiveStorageFormat == PARQUET && isParquetPushdownFilterEnabled(session)) { + HiveStorageFormat hiveStorageFormat = getHiveStorageFormat( + getTableMetadata(session, tableHandle).getProperties()); + if (hiveStorageFormat == ORC || hiveStorageFormat == DWRF + || hiveStorageFormat == PARQUET && isParquetPushdownFilterEnabled(session)) { return true; } } @@ -2674,7 +2900,8 @@ private boolean isPushdownFilterEnabled(ConnectorSession session, ConnectorTable private List pruneColumnComments(List columns) { return columns.stream() - .map(column -> new Column(column.getName(), column.getType(), Optional.empty(), column.getTypeMetadata())) + .map(column -> new Column(column.getName(), column.getType(), Optional.empty(), + column.getTypeMetadata())) .collect(toImmutableList()); } @@ -2690,7 +2917,8 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa // Do not create tuple domains for every partition at the same time! // There can be a huge number of partitions so use an iterable so // all domains do not need to be in memory at the same time. - Iterable> partitionDomains = Iterables.transform(partitions, (hivePartition) -> TupleDomain.fromFixedValues(hivePartition.getKeys())); + Iterable> partitionDomains = Iterables.transform(partitions, + (hivePartition) -> TupleDomain.fromFixedValues(hivePartition.getKeys())); discretePredicates = Optional.of(new DiscretePredicates(partitionColumns, partitionDomains)); } @@ -2699,8 +2927,10 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa MetastoreContext metastoreContext = getMetastoreContext(session); Table table = metastore.getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName()) .orElseThrow(() -> new TableNotFoundException(tableName)); - // never ignore table bucketing for temporary tables as those are created such explicitly by the engine request - boolean bucketExecutionEnabled = table.getTableType().equals(TEMPORARY_TABLE) || isBucketExecutionEnabled(session); + // never ignore table bucketing for temporary tables as those are created such + // explicitly by the engine request + boolean bucketExecutionEnabled = table.getTableType().equals(TEMPORARY_TABLE) + || isBucketExecutionEnabled(session); if (bucketExecutionEnabled && hiveLayoutHandle.getBucketHandle().isPresent()) { HiveBucketHandle hiveBucketHandle = hiveLayoutHandle.getBucketHandle().get(); HivePartitioningHandle partitioningHandle; @@ -2735,7 +2965,8 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa maxCompatibleBucketCount); break; default: - throw new IllegalArgumentException("Unsupported bucket function type " + bucketProperty.getBucketFunctionType()); + throw new IllegalArgumentException( + "Unsupported bucket function type " + bucketProperty.getBucketFunctionType()); } } @@ -2770,7 +3001,8 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa } @Override - public Optional getCommonPartitioningHandle(ConnectorSession session, ConnectorPartitioningHandle left, ConnectorPartitioningHandle right) + public Optional getCommonPartitioningHandle(ConnectorSession session, + ConnectorPartitioningHandle left, ConnectorPartitioningHandle right) { HivePartitioningHandle leftHandle = (HivePartitioningHandle) left; HivePartitioningHandle rightHandle = (HivePartitioningHandle) right; @@ -2798,10 +3030,12 @@ public Optional getCommonPartitioningHandle(Connect return Optional.empty(); } - OptionalInt maxCompatibleBucketCount = min(leftHandle.getMaxCompatibleBucketCount(), rightHandle.getMaxCompatibleBucketCount()); + OptionalInt maxCompatibleBucketCount = min(leftHandle.getMaxCompatibleBucketCount(), + rightHandle.getMaxCompatibleBucketCount()); if (maxCompatibleBucketCount.isPresent() && maxCompatibleBucketCount.getAsInt() < smallerBucketCount) { // maxCompatibleBucketCount must be larger than or equal to smallerBucketCount - // because the current code uses the smallerBucketCount as the common partitioning handle. + // because the current code uses the smallerBucketCount as the common + // partitioning handle. return Optional.empty(); } @@ -2814,7 +3048,8 @@ public Optional getCommonPartitioningHandle(Connect } @Override - public boolean isRefinedPartitioningOver(ConnectorSession session, ConnectorPartitioningHandle left, ConnectorPartitioningHandle right) + public boolean isRefinedPartitioningOver(ConnectorSession session, ConnectorPartitioningHandle left, + ConnectorPartitioningHandle right) { HivePartitioningHandle leftHandle = (HivePartitioningHandle) left; HivePartitioningHandle rightHandle = (HivePartitioningHandle) right; @@ -2845,14 +3080,17 @@ private static OptionalInt min(OptionalInt left, OptionalInt right) } @Override - public ConnectorTableLayoutHandle getAlternativeLayoutHandle(ConnectorSession session, ConnectorTableLayoutHandle tableLayoutHandle, ConnectorPartitioningHandle partitioningHandle) + public ConnectorTableLayoutHandle getAlternativeLayoutHandle(ConnectorSession session, + ConnectorTableLayoutHandle tableLayoutHandle, ConnectorPartitioningHandle partitioningHandle) { HiveTableLayoutHandle hiveLayoutHandle = (HiveTableLayoutHandle) tableLayoutHandle; HivePartitioningHandle hivePartitioningHandle = (HivePartitioningHandle) partitioningHandle; - checkArgument(hiveLayoutHandle.getBucketHandle().isPresent(), "Hive connector only provides alternative layout for bucketed table"); + checkArgument(hiveLayoutHandle.getBucketHandle().isPresent(), + "Hive connector only provides alternative layout for bucketed table"); HiveBucketHandle bucketHandle = hiveLayoutHandle.getBucketHandle().get(); - ImmutableList bucketTypes = bucketHandle.getColumns().stream().map(HiveColumnHandle::getHiveType).collect(toImmutableList()); + ImmutableList bucketTypes = bucketHandle.getColumns().stream().map(HiveColumnHandle::getHiveType) + .collect(toImmutableList()); Optional> hiveTypes = hivePartitioningHandle.getHiveTypes(); checkArgument( hivePartitioningHandle.getBucketFunctionType().equals(HIVE_COMPATIBLE), @@ -2866,7 +3104,8 @@ public ConnectorTableLayoutHandle getAlternativeLayoutHandle(ConnectorSession se int largerBucketCount = Math.max(bucketHandle.getTableBucketCount(), hivePartitioningHandle.getBucketCount()); int smallerBucketCount = Math.min(bucketHandle.getTableBucketCount(), hivePartitioningHandle.getBucketCount()); checkArgument( - largerBucketCount % smallerBucketCount == 0 && Integer.bitCount(largerBucketCount / smallerBucketCount) == 1, + largerBucketCount % smallerBucketCount == 0 + && Integer.bitCount(largerBucketCount / smallerBucketCount) == 1, "The requested partitioning is not a valid alternative for the table layout"); return new HiveTableLayoutHandle( @@ -2880,7 +3119,8 @@ public ConnectorTableLayoutHandle getAlternativeLayoutHandle(ConnectorSession se hiveLayoutHandle.getRemainingPredicate(), hiveLayoutHandle.getPredicateColumns(), hiveLayoutHandle.getPartitionColumnPredicate(), - Optional.of(new HiveBucketHandle(bucketHandle.getColumns(), bucketHandle.getTableBucketCount(), hivePartitioningHandle.getBucketCount())), + Optional.of(new HiveBucketHandle(bucketHandle.getColumns(), bucketHandle.getTableBucketCount(), + hivePartitioningHandle.getBucketCount())), hiveLayoutHandle.getBucketFilter(), hiveLayoutHandle.isPushdownFilterEnabled(), hiveLayoutHandle.getLayoutString(), @@ -2889,7 +3129,8 @@ public ConnectorTableLayoutHandle getAlternativeLayoutHandle(ConnectorSession se } @Override - public ConnectorPartitioningHandle getPartitioningHandleForExchange(ConnectorSession session, int partitionCount, List partitionTypes) + public ConnectorPartitioningHandle getPartitioningHandleForExchange(ConnectorSession session, int partitionCount, + List partitionTypes) { BucketFunctionType bucketFunctionType = getBucketFunctionTypeForExchange(session); @@ -2900,8 +3141,10 @@ public ConnectorPartitioningHandle getPartitioningHandleForExchange(ConnectorSes } } else if (getTemporaryTableStorageFormat(session) == ORC) { - // In this case, ORC is either default format or chosen by user. It should not be reset to PAGEFILE. - // ORC format should not be mixed with PRESTO_NATIVE as ORC compression may reuse PRESTO_NATIVE for compression + // In this case, ORC is either default format or chosen by user. It should not + // be reset to PAGEFILE. + // ORC format should not be mixed with PRESTO_NATIVE as ORC compression may + // reuse PRESTO_NATIVE for compression // which leads to hash collision bucketFunctionType = HIVE_COMPATIBLE; } @@ -2924,7 +3167,8 @@ else if (getTemporaryTableStorageFormat(session) == ORC) { } @VisibleForTesting - static TupleDomain createPredicate(List partitionColumns, List partitions) + static TupleDomain createPredicate(List partitionColumns, + List partitions) { if (partitions.isEmpty()) { return TupleDomain.none(); @@ -2946,7 +3190,8 @@ private static Domain buildColumnDomain(ColumnHandle column, List for (HivePartition partition : partitions) { NullableValue value = partition.getKeys().get(column); if (value == null) { - throw new PrestoException(HIVE_UNKNOWN_ERROR, format("Partition %s does not have a value for partition column %s", partition, column)); + throw new PrestoException(HIVE_UNKNOWN_ERROR, + format("Partition %s does not have a value for partition column %s", partition, column)); } if (value.isNull()) { @@ -2974,7 +3219,8 @@ private static Domain buildColumnDomain(ColumnHandle column, List } @Override - public Optional getInsertLayout(ConnectorSession session, ConnectorTableHandle tableHandle) + public Optional getInsertLayout(ConnectorSession session, + ConnectorTableHandle tableHandle) { HiveTableHandle hiveTableHandle = (HiveTableHandle) tableHandle; SchemaTableName tableName = hiveTableHandle.getSchemaTableName(); @@ -3011,7 +3257,8 @@ public Optional getInsertLayout(ConnectorSession sessio maxCompatibleBucketCount); break; default: - throw new IllegalArgumentException("Unsupported bucket function type " + bucketProperty.getBucketFunctionType()); + throw new IllegalArgumentException( + "Unsupported bucket function type " + bucketProperty.getBucketFunctionType()); } List partitionColumns = hiveBucketHandle.get().getColumns().stream() @@ -3021,7 +3268,8 @@ public Optional getInsertLayout(ConnectorSession sessio } @Override - public Optional getPreferredShuffleLayoutForInsert(ConnectorSession session, ConnectorTableHandle tableHandle) + public Optional getPreferredShuffleLayoutForInsert(ConnectorSession session, + ConnectorTableHandle tableHandle) { HiveTableHandle hiveTableHandle = (HiveTableHandle) tableHandle; SchemaTableName tableName = hiveTableHandle.getSchemaTableName(); @@ -3031,7 +3279,8 @@ public Optional getPreferredShuffleLayoutForInsert(Conn Optional hiveBucketHandle = getHiveBucketHandle(table); if (hiveBucketHandle.isPresent()) { - // For bucketed table, table partitioning (i.e. the bucketing scheme) should be respected, + // For bucketed table, table partitioning (i.e. the bucketing scheme) should be + // respected, // and there is no additional preferred shuffle partitioning return Optional.empty(); } @@ -3040,7 +3289,8 @@ public Optional getPreferredShuffleLayoutForInsert(Conn return Optional.empty(); } - // TODO: the shuffle partitioning could use a better hash function (instead of Hive bucket function) + // TODO: the shuffle partitioning could use a better hash function (instead of + // Hive bucket function) HivePartitioningHandle partitioningHandle = createHiveCompatiblePartitioningHandle( SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE, table.getPartitionColumns().stream() @@ -3055,7 +3305,8 @@ public Optional getPreferredShuffleLayoutForInsert(Conn } @Override - public Optional getNewTableLayout(ConnectorSession session, ConnectorTableMetadata tableMetadata) + public Optional getNewTableLayout(ConnectorSession session, + ConnectorTableMetadata tableMetadata) { validatePartitionColumns(tableMetadata); validateBucketColumns(tableMetadata); @@ -3086,13 +3337,15 @@ public Optional getNewTableLayout(ConnectorSession sess } @Override - public Optional getPreferredShuffleLayoutForNewTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) + public Optional getPreferredShuffleLayoutForNewTable(ConnectorSession session, + ConnectorTableMetadata tableMetadata) { validatePartitionColumns(tableMetadata); validateBucketColumns(tableMetadata); Optional bucketProperty = getBucketProperty(tableMetadata.getProperties()); if (bucketProperty.isPresent()) { - // For bucketed table, table partitioning (i.e. the bucketing scheme) should be respected, + // For bucketed table, table partitioning (i.e. the bucketing scheme) should be + // respected, // and there is no additional preferred shuffle partitioning return Optional.empty(); } @@ -3102,14 +3355,16 @@ public Optional getPreferredShuffleLayoutForNewTable(Co return Optional.empty(); } - List columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), typeTranslator); + List columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), + typeTranslator); Map columnHandlesByName = Maps.uniqueIndex(columnHandles, HiveColumnHandle::getName); List partitionColumns = partitionedBy.stream() .map(columnHandlesByName::get) .map(columnHandle -> columnHandleToColumn(session, columnHandle)) .collect(toList()); - // TODO: the shuffle partitioning could use a better hash function (instead of Hive bucket function) + // TODO: the shuffle partitioning could use a better hash function (instead of + // Hive bucket function) HivePartitioningHandle partitioningHandle = createHiveCompatiblePartitioningHandle( SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE, partitionColumns.stream() @@ -3121,30 +3376,37 @@ public Optional getPreferredShuffleLayoutForNewTable(Co } @Override - public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(ConnectorSession session, ConnectorTableMetadata tableMetadata) + public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(ConnectorSession session, + ConnectorTableMetadata tableMetadata) { if (!isCollectColumnStatisticsOnWrite(session)) { return TableStatisticsMetadata.empty(); } List partitionedBy = firstNonNull(getPartitionedBy(tableMetadata.getProperties()), ImmutableList.of()); MetastoreContext metastoreContext = getMetastoreContext(session); - Optional
table = metastore.getTable(metastoreContext, tableMetadata.getTable().getSchemaName(), tableMetadata.getTable().getTableName()); - return getStatisticsCollectionMetadata(session, tableMetadata.getColumns(), partitionedBy, false, table.isPresent() && table.get().getTableType() == TEMPORARY_TABLE); + Optional
table = metastore.getTable(metastoreContext, tableMetadata.getTable().getSchemaName(), + tableMetadata.getTable().getTableName()); + return getStatisticsCollectionMetadata(session, tableMetadata.getColumns(), partitionedBy, false, + table.isPresent() && table.get().getTableType() == TEMPORARY_TABLE); } @Override - public TableStatisticsMetadata getStatisticsCollectionMetadata(ConnectorSession session, ConnectorTableMetadata tableMetadata) + public TableStatisticsMetadata getStatisticsCollectionMetadata(ConnectorSession session, + ConnectorTableMetadata tableMetadata) { List partitionedBy = firstNonNull(getPartitionedBy(tableMetadata.getProperties()), ImmutableList.of()); return getStatisticsCollectionMetadata(session, tableMetadata.getColumns(), partitionedBy, true, false); } - private TableStatisticsMetadata getStatisticsCollectionMetadata(ConnectorSession session, List columns, List partitionedBy, boolean includeRowCount, boolean isTemporaryTable) + private TableStatisticsMetadata getStatisticsCollectionMetadata(ConnectorSession session, + List columns, List partitionedBy, boolean includeRowCount, + boolean isTemporaryTable) { Set columnStatistics = columns.stream() .filter(column -> !partitionedBy.contains(column.getName())) .filter(column -> !column.isHidden()) - .map(meta -> isTemporaryTable ? this.getColumnStatisticMetadataForTemporaryTable(meta) : this.getColumnStatisticMetadata(session, meta)) + .map(meta -> isTemporaryTable ? this.getColumnStatisticMetadataForTemporaryTable(meta) + : this.getColumnStatisticMetadata(session, meta)) .flatMap(List::stream) .collect(toImmutableSet()); @@ -3152,18 +3414,22 @@ private TableStatisticsMetadata getStatisticsCollectionMetadata(ConnectorSession return new TableStatisticsMetadata(columnStatistics, tableStatistics, partitionedBy); } - private List getColumnStatisticMetadata(ConnectorSession session, ColumnMetadata columnMetadata) + private List getColumnStatisticMetadata(ConnectorSession session, + ColumnMetadata columnMetadata) { MetastoreContext metastoreContext = getMetastoreContext(session); - return getColumnStatisticMetadata(columnMetadata.getName(), metastore.getSupportedColumnStatistics(metastoreContext, columnMetadata.getType())); + return getColumnStatisticMetadata(columnMetadata.getName(), + metastore.getSupportedColumnStatistics(metastoreContext, columnMetadata.getType())); } private List getColumnStatisticMetadataForTemporaryTable(ColumnMetadata columnMetadata) { - return getColumnStatisticMetadata(columnMetadata.getName(), getSupportedColumnStatisticsForTemporaryTable(columnMetadata.getType())); + return getColumnStatisticMetadata(columnMetadata.getName(), + getSupportedColumnStatisticsForTemporaryTable(columnMetadata.getType())); } - private List getColumnStatisticMetadata(String columnName, Set statisticTypes) + private List getColumnStatisticMetadata(String columnName, + Set statisticTypes) { return statisticTypes.stream() .map(type -> new ColumnStatisticMetadata(columnName, type)) @@ -3175,7 +3441,8 @@ public void createRole(ConnectorSession session, String role, Optional listRoleGrants(ConnectorSession session, PrestoPrincipal p } @Override - public void grantRoles(ConnectorSession session, Set roles, Set grantees, boolean withAdminOption, Optional grantor) + public void grantRoles(ConnectorSession session, Set roles, Set grantees, + boolean withAdminOption, Optional grantor) { MetastoreContext metastoreContext = getMetastoreContext(session); - metastore.grantRoles(metastoreContext, roles, grantees, withAdminOption, grantor.orElse(new PrestoPrincipal(USER, session.getUser()))); + metastore.grantRoles(metastoreContext, roles, grantees, withAdminOption, + grantor.orElse(new PrestoPrincipal(USER, session.getUser()))); } @Override - public void revokeRoles(ConnectorSession session, Set roles, Set grantees, boolean adminOptionFor, Optional grantor) + public void revokeRoles(ConnectorSession session, Set roles, Set grantees, + boolean adminOptionFor, Optional grantor) { MetastoreContext metastoreContext = getMetastoreContext(session); - metastore.revokeRoles(metastoreContext, roles, grantees, adminOptionFor, grantor.orElse(new PrestoPrincipal(USER, session.getUser()))); + metastore.revokeRoles(metastoreContext, roles, grantees, adminOptionFor, + grantor.orElse(new PrestoPrincipal(USER, session.getUser()))); } @Override public Set listApplicableRoles(ConnectorSession session, PrestoPrincipal principal) { MetastoreContext metastoreContext = getMetastoreContext(session); - return ThriftMetastoreUtil.listApplicableRoles(principal, (PrestoPrincipal p) -> metastore.listRoleGrants(metastoreContext, p)) + return ThriftMetastoreUtil + .listApplicableRoles(principal, (PrestoPrincipal p) -> metastore.listRoleGrants(metastoreContext, p)) .collect(toImmutableSet()); } @@ -3225,18 +3497,22 @@ public Set listApplicableRoles(ConnectorSession session, PrestoPrinci public Set listEnabledRoles(ConnectorSession session) { MetastoreContext metastoreContext = getMetastoreContext(session); - return ThriftMetastoreUtil.listEnabledRoles(session.getIdentity(), (PrestoPrincipal p) -> metastore.listRoleGrants(metastoreContext, p)) + return ThriftMetastoreUtil + .listEnabledRoles(session.getIdentity(), + (PrestoPrincipal p) -> metastore.listRoleGrants(metastoreContext, p)) .collect(toImmutableSet()); } @Override - public void grantTablePrivileges(ConnectorSession session, SchemaTableName schemaTableName, Set privileges, PrestoPrincipal grantee, boolean grantOption) + public void grantTablePrivileges(ConnectorSession session, SchemaTableName schemaTableName, + Set privileges, PrestoPrincipal grantee, boolean grantOption) { String schemaName = schemaTableName.getSchemaName(); String tableName = schemaTableName.getTableName(); Set hivePrivilegeInfos = privileges.stream() - .map(privilege -> new HivePrivilegeInfo(toHivePrivilege(privilege), grantOption, new PrestoPrincipal(USER, session.getUser()), new PrestoPrincipal(USER, session.getUser()))) + .map(privilege -> new HivePrivilegeInfo(toHivePrivilege(privilege), grantOption, + new PrestoPrincipal(USER, session.getUser()), new PrestoPrincipal(USER, session.getUser()))) .collect(toSet()); MetastoreContext metastoreContext = getMetastoreContext(session); @@ -3244,13 +3520,15 @@ public void grantTablePrivileges(ConnectorSession session, SchemaTableName schem } @Override - public void revokeTablePrivileges(ConnectorSession session, SchemaTableName schemaTableName, Set privileges, PrestoPrincipal grantee, boolean grantOption) + public void revokeTablePrivileges(ConnectorSession session, SchemaTableName schemaTableName, + Set privileges, PrestoPrincipal grantee, boolean grantOption) { String schemaName = schemaTableName.getSchemaName(); String tableName = schemaTableName.getTableName(); Set hivePrivilegeInfos = privileges.stream() - .map(privilege -> new HivePrivilegeInfo(toHivePrivilege(privilege), grantOption, new PrestoPrincipal(USER, session.getUser()), new PrestoPrincipal(USER, session.getUser()))) + .map(privilege -> new HivePrivilegeInfo(toHivePrivilege(privilege), grantOption, + new PrestoPrincipal(USER, session.getUser()), new PrestoPrincipal(USER, session.getUser()))) .collect(toSet()); MetastoreContext metastoreContext = getMetastoreContext(session); @@ -3279,7 +3557,8 @@ public List listTablePrivileges(ConnectorSession session, SchemaTable } @Override - public CompletableFuture commitPageSinkAsync(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments) + public CompletableFuture commitPageSinkAsync(ConnectorSession session, ConnectorOutputTableHandle tableHandle, + Collection fragments) { HiveOutputTableHandle handle = (HiveOutputTableHandle) tableHandle; return toCompletableFuture(stagingFileCommitter.commitFiles( @@ -3292,7 +3571,8 @@ public CompletableFuture commitPageSinkAsync(ConnectorSession session, Con } @Override - public CompletableFuture commitPageSinkAsync(ConnectorSession session, ConnectorInsertTableHandle tableHandle, Collection fragments) + public CompletableFuture commitPageSinkAsync(ConnectorSession session, ConnectorInsertTableHandle tableHandle, + Collection fragments) { HiveInsertTableHandle handle = (HiveInsertTableHandle) tableHandle; return toCompletableFuture(stagingFileCommitter.commitFiles( @@ -3305,7 +3585,8 @@ public CompletableFuture commitPageSinkAsync(ConnectorSession session, Con } @Override - public List getMetadataUpdateResults(List metadataUpdateRequests, QueryId queryId) + public List getMetadataUpdateResults( + List metadataUpdateRequests, QueryId queryId) { return hiveFileRenamer.getMetadataUpdateResults(metadataUpdateRequests, queryId); } @@ -3316,11 +3597,13 @@ public void doMetadataUpdateCleanup(QueryId queryId) hiveFileRenamer.cleanup(queryId); } - private List buildGrants(ConnectorSession session, SchemaTableName tableName, PrestoPrincipal principal) + private List buildGrants(ConnectorSession session, SchemaTableName tableName, + PrestoPrincipal principal) { ImmutableList.Builder result = ImmutableList.builder(); MetastoreContext metastoreContext = getMetastoreContext(session); - Set hivePrivileges = metastore.listTablePrivileges(metastoreContext, tableName.getSchemaName(), tableName.getTableName(), principal); + Set hivePrivileges = metastore.listTablePrivileges(metastoreContext, + tableName.getSchemaName(), tableName.getTableName(), principal); for (HivePrivilegeInfo hivePrivilege : hivePrivileges) { Set prestoPrivileges = hivePrivilege.toPrivilegeInfo(); for (PrivilegeInfo prestoPrivilege : prestoPrivileges) { @@ -3379,7 +3662,8 @@ private static HiveStorageFormat extractHiveStorageFormat(Table table) return format; } } - throw new PrestoException(HIVE_UNSUPPORTED_FORMAT, format("Output format %s with SerDe %s is not supported", outputFormat, serde)); + throw new PrestoException(HIVE_UNSUPPORTED_FORMAT, + format("Output format %s with SerDe %s is not supported", outputFormat, serde)); } @VisibleForTesting @@ -3397,7 +3681,8 @@ static List decodePreferredOrderingColumnsFromStorage(Storage sto return ImmutableList.of(); } - return Splitter.on(COMMA).trimResults().omitEmptyStrings().splitToList(storage.getParameters().get(PREFERRED_ORDERING_COLUMNS)).stream() + return Splitter.on(COMMA).trimResults().omitEmptyStrings() + .splitToList(storage.getParameters().get(PREFERRED_ORDERING_COLUMNS)).stream() .map(SortingColumn::sortingColumnFromString) .collect(toImmutableList()); } @@ -3414,14 +3699,16 @@ private static void validateBucketColumns(ConnectorTableMetadata tableMetadata) List bucketedBy = bucketProperty.get().getBucketedBy(); if (!allColumns.containsAll(bucketedBy)) { - throw new PrestoException(INVALID_TABLE_PROPERTY, format("Bucketing columns %s not present in schema", Sets.difference(ImmutableSet.copyOf(bucketedBy), ImmutableSet.copyOf(allColumns)))); + throw new PrestoException(INVALID_TABLE_PROPERTY, format("Bucketing columns %s not present in schema", + Sets.difference(ImmutableSet.copyOf(bucketedBy), ImmutableSet.copyOf(allColumns)))); } List sortedBy = bucketProperty.get().getSortedBy().stream() .map(SortingColumn::getColumnName) .collect(toImmutableList()); if (!allColumns.containsAll(sortedBy)) { - throw new PrestoException(INVALID_TABLE_PROPERTY, format("Sorting columns %s not present in schema", Sets.difference(ImmutableSet.copyOf(sortedBy), ImmutableSet.copyOf(allColumns)))); + throw new PrestoException(INVALID_TABLE_PROPERTY, format("Sorting columns %s not present in schema", + Sets.difference(ImmutableSet.copyOf(sortedBy), ImmutableSet.copyOf(allColumns)))); } } @@ -3443,7 +3730,8 @@ private static void validatePartitionColumns(ConnectorTableMetadata tableMetadat .collect(toList()); if (!allColumns.containsAll(partitionedBy)) { - throw new PrestoException(INVALID_TABLE_PROPERTY, format("Partition columns %s not present in schema", Sets.difference(ImmutableSet.copyOf(partitionedBy), ImmutableSet.copyOf(allColumns)))); + throw new PrestoException(INVALID_TABLE_PROPERTY, format("Partition columns %s not present in schema", + Sets.difference(ImmutableSet.copyOf(partitionedBy), ImmutableSet.copyOf(allColumns)))); } if (allColumns.size() == partitionedBy.size()) { @@ -3451,21 +3739,26 @@ private static void validatePartitionColumns(ConnectorTableMetadata tableMetadat } if (!allColumns.subList(allColumns.size() - partitionedBy.size(), allColumns.size()).equals(partitionedBy)) { - throw new PrestoException(HIVE_COLUMN_ORDER_MISMATCH, "Partition keys must be the last columns in the table and in the same order as the table properties: " + partitionedBy); + throw new PrestoException(HIVE_COLUMN_ORDER_MISMATCH, + "Partition keys must be the last columns in the table and in the same order as the table properties: " + + partitionedBy); } } - protected Optional getTableEncryptionPropertiesFromTableProperties(ConnectorTableMetadata tableMetadata, HiveStorageFormat hiveStorageFormat, List partitionedBy) + protected Optional getTableEncryptionPropertiesFromTableProperties( + ConnectorTableMetadata tableMetadata, HiveStorageFormat hiveStorageFormat, List partitionedBy) { ColumnEncryptionInformation columnEncryptionInformation = getEncryptColumns(tableMetadata.getProperties()); String tableEncryptionReference = getEncryptTable(tableMetadata.getProperties()); - if (tableEncryptionReference == null && (columnEncryptionInformation == null || !columnEncryptionInformation.hasEntries())) { + if (tableEncryptionReference == null + && (columnEncryptionInformation == null || !columnEncryptionInformation.hasEntries())) { return Optional.empty(); } if (tableEncryptionReference != null && columnEncryptionInformation.hasEntries()) { - throw new PrestoException(INVALID_TABLE_PROPERTY, format("Only one of %s or %s should be specified", ENCRYPT_TABLE, ENCRYPT_COLUMNS)); + throw new PrestoException(INVALID_TABLE_PROPERTY, + format("Only one of %s or %s should be specified", ENCRYPT_TABLE, ENCRYPT_COLUMNS)); } if (hiveStorageFormat != DWRF) { @@ -3475,34 +3768,44 @@ protected Optional getTableEncryptionPropertiesFromTa // Change based on the file format as more file formats might support encryption if (tableEncryptionReference != null) { - return Optional.of(getDwrfTableEncryptionProperties(Optional.of(tableEncryptionReference), Optional.empty(), tableMetadata)); + return Optional.of(getDwrfTableEncryptionProperties(Optional.of(tableEncryptionReference), Optional.empty(), + tableMetadata)); } partitionedBy.forEach(partitionColumn -> { - if (columnEncryptionInformation.getColumnToKeyReference().containsKey(ColumnWithStructSubfield.valueOf(partitionColumn))) { - throw new PrestoException(INVALID_TABLE_PROPERTY, format("Partition column (%s) cannot be used as an encryption column", partitionColumn)); + if (columnEncryptionInformation.getColumnToKeyReference() + .containsKey(ColumnWithStructSubfield.valueOf(partitionColumn))) { + throw new PrestoException(INVALID_TABLE_PROPERTY, + format("Partition column (%s) cannot be used as an encryption column", partitionColumn)); } }); - Map columnMetadata = tableMetadata.getColumns().stream().collect(toImmutableMap(ColumnMetadata::getName, identity())); - // Sorting to ensure that we can find cases of multiple referenceKeys within the same struct chain. Example - a.b and a.b.c - // By sorting we ensure that we have already seen a.b and can find that when we visit a.b.c - List sortedColumns = new ArrayList<>(columnEncryptionInformation.getColumnToKeyReference().keySet()); + Map columnMetadata = tableMetadata.getColumns().stream() + .collect(toImmutableMap(ColumnMetadata::getName, identity())); + // Sorting to ensure that we can find cases of multiple referenceKeys within the + // same struct chain. Example - a.b and a.b.c + // By sorting we ensure that we have already seen a.b and can find that when we + // visit a.b.c + List sortedColumns = new ArrayList<>( + columnEncryptionInformation.getColumnToKeyReference().keySet()); sortedColumns.sort(Comparator.comparing(ColumnWithStructSubfield::toString)); Set seenColumns = new HashSet<>(); for (ColumnWithStructSubfield columnWithSubfield : sortedColumns) { ColumnMetadata column = columnMetadata.get(columnWithSubfield.getColumnName()); if (column == null) { - throw new PrestoException(INVALID_TABLE_PROPERTY, format("In %s unable to find column %s", ENCRYPT_COLUMNS, columnWithSubfield.getColumnName())); + throw new PrestoException(INVALID_TABLE_PROPERTY, + format("In %s unable to find column %s", ENCRYPT_COLUMNS, columnWithSubfield.getColumnName())); } if (seenColumns.contains(columnWithSubfield.toString())) { - throw new PrestoException(INVALID_TABLE_PROPERTY, format("The same column/subfield cannot have 2 encryption keys")); + throw new PrestoException(INVALID_TABLE_PROPERTY, + format("The same column/subfield cannot have 2 encryption keys")); } if (columnWithSubfield.getSubfieldPath().isPresent()) { - Iterable subfieldPathFragments = Splitter.on(".").split(columnWithSubfield.getSubfieldPath().get()); + Iterable subfieldPathFragments = Splitter.on(".") + .split(columnWithSubfield.getSubfieldPath().get()); Type columnType = column.getType(); String parentPath = columnWithSubfield.getColumnName(); @@ -3510,13 +3813,16 @@ protected Optional getTableEncryptionPropertiesFromTa if (!(columnType instanceof RowType)) { throw new PrestoException( INVALID_TABLE_PROPERTY, - format("In %s subfields declared in %s, but %s has type %s", ENCRYPT_COLUMNS, columnWithSubfield.toString(), column.getName(), column.getType().getDisplayName())); + format("In %s subfields declared in %s, but %s has type %s", ENCRYPT_COLUMNS, + columnWithSubfield.toString(), column.getName(), + column.getType().getDisplayName())); } if (seenColumns.contains(parentPath)) { throw new PrestoException( INVALID_TABLE_PROPERTY, - format("For (%s) found a keyReference at a higher level field (%s)", columnWithSubfield.toString(), parentPath)); + format("For (%s) found a keyReference at a higher level field (%s)", + columnWithSubfield.toString(), parentPath)); } RowType row = (RowType) columnType; @@ -3526,7 +3832,9 @@ protected Optional getTableEncryptionPropertiesFromTa .map(RowType.Field::getType) .orElseThrow(() -> new PrestoException( INVALID_TABLE_PROPERTY, - format("In %s subfields declared in %s, but %s has type %s", ENCRYPT_COLUMNS, columnWithSubfield.toString(), column.getName(), column.getType().getDisplayName()))); + format("In %s subfields declared in %s, but %s has type %s", ENCRYPT_COLUMNS, + columnWithSubfield.toString(), column.getName(), + column.getType().getDisplayName()))); parentPath = format("%s.%s", parentPath, pathFragment); } @@ -3535,7 +3843,8 @@ protected Optional getTableEncryptionPropertiesFromTa seenColumns.add(columnWithSubfield.toString()); } - return Optional.of(getDwrfTableEncryptionProperties(Optional.empty(), Optional.of(columnEncryptionInformation), tableMetadata)); + return Optional.of(getDwrfTableEncryptionProperties(Optional.empty(), Optional.of(columnEncryptionInformation), + tableMetadata)); } private static DwrfTableEncryptionProperties getDwrfTableEncryptionProperties( @@ -3547,22 +3856,26 @@ private static DwrfTableEncryptionProperties getDwrfTableEncryptionProperties( String encryptionProvider = getDwrfEncryptionProvider(tableMetadata.getProperties()); if (encryptionAlgorithm == null) { - throw new PrestoException(INVALID_TABLE_PROPERTY, format("%s needs to be provided for DWRF encrypted tables", DWRF_ENCRYPTION_ALGORITHM)); + throw new PrestoException(INVALID_TABLE_PROPERTY, + format("%s needs to be provided for DWRF encrypted tables", DWRF_ENCRYPTION_ALGORITHM)); } if (encryptionProvider == null) { - throw new PrestoException(INVALID_TABLE_PROPERTY, format("%s needs to be provided for DWRF encrypted tables", DWRF_ENCRYPTION_PROVIDER)); + throw new PrestoException(INVALID_TABLE_PROPERTY, + format("%s needs to be provided for DWRF encrypted tables", DWRF_ENCRYPTION_PROVIDER)); } return encryptTable .map(s -> forTable(s, encryptionAlgorithm, encryptionProvider)) .orElseGet(() -> forPerColumn( - columnEncryptionInformation.orElseThrow(() -> new PrestoException(GENERIC_INTERNAL_ERROR, "columnEncryptionInformation cannot be empty")), + columnEncryptionInformation.orElseThrow(() -> new PrestoException(GENERIC_INTERNAL_ERROR, + "columnEncryptionInformation cannot be empty")), encryptionAlgorithm, encryptionProvider)); } - private static List getColumnHandles(ConnectorTableMetadata tableMetadata, Set partitionColumnNames, TypeTranslator typeTranslator) + private static List getColumnHandles(ConnectorTableMetadata tableMetadata, + Set partitionColumnNames, TypeTranslator typeTranslator) { validatePartitionColumns(tableMetadata); validateBucketColumns(tableMetadata); @@ -3627,12 +3940,15 @@ private static void validateCsvColumns(ConnectorTableMetadata tableMetadata) String joinedUnsupportedColumns = unsupportedColumns.stream() .map(columnMetadata -> format("%s %s", columnMetadata.getName(), columnMetadata.getType())) .collect(joining(", ")); - throw new PrestoException(NOT_SUPPORTED, "Hive CSV storage format only supports VARCHAR (unbounded). Unsupported columns: " + joinedUnsupportedColumns); + throw new PrestoException(NOT_SUPPORTED, + "Hive CSV storage format only supports VARCHAR (unbounded). Unsupported columns: " + + joinedUnsupportedColumns); } } @VisibleForTesting - static Function columnMetadataGetter(Table table, TypeManager typeManager, ColumnConverter columnConverter) + static Function columnMetadataGetter(Table table, TypeManager typeManager, + ColumnConverter columnConverter) { ImmutableList.Builder columnNames = ImmutableList.builder(); table.getPartitionColumns().stream().map(Column::getName).forEach(columnNames::add); @@ -3640,7 +3956,8 @@ static Function columnMetadataGetter(Table tab List allColumnNames = columnNames.build(); if (allColumnNames.size() > Sets.newHashSet(allColumnNames).size()) { throw new PrestoException(HIVE_INVALID_METADATA, - format("Hive metadata for table %s is invalid: Table descriptor contains duplicate columns", table.getTableName())); + format("Hive metadata for table %s is invalid: Table descriptor contains duplicate columns", + table.getTableName())); } List tableColumns = table.getDataColumns(); @@ -3671,7 +3988,8 @@ static Function columnMetadataGetter(Table tab return handle -> new ColumnMetadata( handle.getName(), - typeManager.getType(columnConverter.getTypeSignature(handle.getHiveType(), typeMetadata.getOrDefault(handle.getName(), Optional.empty()))), + typeManager.getType(columnConverter.getTypeSignature(handle.getHiveType(), + typeMetadata.getOrDefault(handle.getName(), Optional.empty()))), columnComment.get(handle.getName()).orElse(null), columnExtraInfo(handle.isPartitionKey()), handle.isHidden()); @@ -3690,7 +4008,8 @@ public void commit() } @Override - public TableLayoutFilterCoverage getTableLayoutFilterCoverage(ConnectorTableLayoutHandle connectorTableLayoutHandle, Set relevantPartitionColumns) + public TableLayoutFilterCoverage getTableLayoutFilterCoverage(ConnectorTableLayoutHandle connectorTableLayoutHandle, + Set relevantPartitionColumns) { HiveTableLayoutHandle tableHandle = (HiveTableLayoutHandle) connectorTableLayoutHandle; Set relevantColumns = tableHandle.getPartitionColumns().stream() @@ -3701,7 +4020,8 @@ public TableLayoutFilterCoverage getTableLayoutFilterCoverage(ConnectorTableLayo return NOT_APPLICABLE; } - return Sets.intersection(tableHandle.getPredicateColumns().keySet(), relevantColumns).isEmpty() ? NOT_COVERED : COVERED; + return Sets.intersection(tableHandle.getPredicateColumns().keySet(), relevantColumns).isEmpty() ? NOT_COVERED + : COVERED; } public static Optional getSourceTableNameFromSystemTable(SchemaTableName tableName) @@ -3712,7 +4032,8 @@ public static Optional getSourceTableNameFromSystemTable(Schema .findAny(); } - private static SystemTable createSystemTable(ConnectorTableMetadata metadata, Function, RecordCursor> cursor) + private static SystemTable createSystemTable(ConnectorTableMetadata metadata, + Function, RecordCursor> cursor) { return new SystemTable() { @@ -3729,7 +4050,8 @@ public ConnectorTableMetadata getTableMetadata() } @Override - public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) + public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, + TupleDomain constraint) { return cursor.apply(constraint); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveOutputTableHandle.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveOutputTableHandle.java index 8e90847823204..6220ff6860cfc 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveOutputTableHandle.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveOutputTableHandle.java @@ -34,6 +34,7 @@ public class HiveOutputTableHandle private final List partitionedBy; private final String tableOwner; private final Map additionalTableParameters; + private final boolean external; @JsonCreator public HiveOutputTableHandle( @@ -51,7 +52,8 @@ public HiveOutputTableHandle( @JsonProperty("preferredOrderingColumns") List preferredOrderingColumns, @JsonProperty("tableOwner") String tableOwner, @JsonProperty("additionalTableParameters") Map additionalTableParameters, - @JsonProperty("encryptionInformation") Optional encryptionInformation) + @JsonProperty("encryptionInformation") Optional encryptionInformation, + @JsonProperty("external") boolean external) { super( schemaName, @@ -70,6 +72,7 @@ public HiveOutputTableHandle( this.partitionedBy = ImmutableList.copyOf(requireNonNull(partitionedBy, "partitionedBy is null")); this.tableOwner = requireNonNull(tableOwner, "tableOwner is null"); this.additionalTableParameters = ImmutableMap.copyOf(requireNonNull(additionalTableParameters, "additionalTableParameters is null")); + this.external = external; } @JsonProperty @@ -89,4 +92,10 @@ public Map getAdditionalTableParameters() { return additionalTableParameters; } + + @JsonProperty + public boolean isExternal() + { + return external; + } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/LocationService.java b/presto-hive/src/main/java/com/facebook/presto/hive/LocationService.java index b6d42925cf282..27056ebbb9e57 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/LocationService.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/LocationService.java @@ -25,7 +25,7 @@ public interface LocationService { - LocationHandle forNewTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName, boolean tempPathRequired); + LocationHandle forNewTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName, boolean tempPathRequired, Optional externalLocation); LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table, boolean tempPathRequired); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java index 816fa8da3e00f..a4bca4c6b7208 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java @@ -2823,7 +2823,7 @@ public void testTableCreationIgnoreExisting() try { try (Transaction transaction = newTransaction()) { LocationService locationService = getLocationService(); - LocationHandle locationHandle = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName, false); + LocationHandle locationHandle = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName, false, Optional.empty()); targetPath = locationService.getQueryWriteInfo(locationHandle).getTargetPath(); Table table = createSimpleTable(schemaTableName, columns, session, targetPath, "q1"); transaction.getMetastore() @@ -3519,7 +3519,7 @@ public void testIllegalStorageFormatDuringTableScan() String tableOwner = session.getUser(); String schemaName = schemaTableName.getSchemaName(); String tableName = schemaTableName.getTableName(); - LocationHandle locationHandle = getLocationService().forNewTable(transaction.getMetastore(), session, schemaName, tableName, false); + LocationHandle locationHandle = getLocationService().forNewTable(transaction.getMetastore(), session, schemaName, tableName, false, Optional.empty()); Path targetPath = getLocationService().getQueryWriteInfo(locationHandle).getTargetPath(); //create table whose storage format is null Table.Builder tableBuilder = Table.builder() @@ -5334,7 +5334,7 @@ protected Table createEmptyTable(SchemaTableName schemaTableName, HiveStorageFor String tableName = schemaTableName.getTableName(); LocationService locationService = getLocationService(); - LocationHandle locationHandle = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName, false); + LocationHandle locationHandle = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName, false, Optional.empty()); targetPath = locationService.getQueryWriteInfo(locationHandle).getTargetPath(); Table.Builder tableBuilder = Table.builder() diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveCreateExternalTable.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveCreateExternalTable.java new file mode 100644 index 0000000000000..90186e843448a --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveCreateExternalTable.java @@ -0,0 +1,95 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.testing.MaterializedResult; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.intellij.lang.annotations.Language; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Optional; + +import static com.facebook.presto.tests.QueryAssertions.assertEqualsIgnoreOrder; +import static com.google.common.io.Files.createTempDir; +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.airlift.tpch.TpchTable.NATION; +import static java.lang.String.format; +import static org.testng.Assert.assertTrue; + +public class TestHiveCreateExternalTable + extends AbstractTestQueryFramework +{ + protected QueryRunner createQueryRunner() + throws Exception + { + return HiveQueryRunner.createQueryRunner( + ImmutableList.of(NATION), + ImmutableMap.of(), + "sql-standard", + ImmutableMap.of( + "hive.non-managed-table-writes-enabled", "true"), + Optional.empty()); + } + + @Test + public void testCreateExternalTableWithData() + throws IOException + { + String tableName = "test_create_external"; + File tempDir = createTempDir(); + File tableLocation = new File(tempDir, tableName); + + @Language("SQL") + String createTableSql = format("" + + "CREATE TABLE test_create_external " + + "WITH (external_location = '%s') AS " + + "SELECT * FROM tpch.tiny.nation", + tableLocation.toURI().toASCIIString()); + + assertUpdate(createTableSql, 25); + + MaterializedResult expected = computeActual("SELECT * FROM tpch.tiny.nation"); + MaterializedResult actual = computeActual("SELECT * FROM " + tableName); + assertEqualsIgnoreOrder(actual.getMaterializedRows(), expected.getMaterializedRows()); + + String tablePath = (String) computeActual( + "SELECT DISTINCT regexp_replace(\"$path\", '/[^/]*$', '/') FROM " + tableName) + .getOnlyValue(); + assertTrue(tablePath.startsWith(tableLocation.toURI().toString())); + + assertUpdate("DROP TABLE test_create_external"); + deleteRecursively(tempDir.toPath(), ALLOW_INSECURE); + } + + @Test + public void testCreateExternalTableAsWithExistingDirectory() + { + File tempDir = createTempDir(); + + @Language("SQL") + String createTableSql = format("" + + "CREATE TABLE test_create_external " + + "WITH (external_location = '%s') AS " + + "SELECT * FROM tpch.tiny.nation", + tempDir.toURI().toASCIIString()); + + assertQueryFails(createTableSql, "Target directory for table '.*' already exists:.*"); + } +} diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveCreateExternalTableDisabled.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveCreateExternalTableDisabled.java new file mode 100644 index 0000000000000..e3994c9b6de27 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveCreateExternalTableDisabled.java @@ -0,0 +1,78 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.intellij.lang.annotations.Language; +import org.testng.annotations.Test; + +import java.io.File; +import java.util.Optional; + +import static com.google.common.io.Files.createTempDir; +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.airlift.tpch.TpchTable.NATION; +import static java.lang.String.format; + +public class TestHiveCreateExternalTableDisabled + extends AbstractTestQueryFramework +{ + protected QueryRunner createQueryRunner() + throws Exception + { + return HiveQueryRunner.createQueryRunner( + ImmutableList.of(NATION), + ImmutableMap.of(), + "sql-standard", + ImmutableMap.of( + "hive.non-managed-table-writes-enabled", "true", + "hive.non-managed-table-creates-enabled", "false"), + Optional.empty()); + } + + @Test + public void testCreateExternalTableWithData() + throws Exception + { + File tempDir = createTempDir(); + + @Language("SQL") String createTableSql = format("" + + "CREATE TABLE test_create_external " + + "WITH (external_location = '%s') AS " + + "SELECT * FROM tpch.tiny.nation", + tempDir.toURI().toASCIIString()); + assertQueryFails(createTableSql, "Creating non-managed Hive tables is disabled"); + + deleteRecursively(tempDir.toPath(), ALLOW_INSECURE); + } + + @Test + public void testCreateExternalTable() + throws Exception + { + File tempDir = createTempDir(); + + @Language("SQL") String createTableSql = format("" + + "CREATE TABLE test_create_external (n TINYINT) " + + "WITH (external_location = '%s')", + tempDir.toURI().toASCIIString()); + assertQueryFails(createTableSql, "Cannot create non-managed Hive table"); + + deleteRecursively(tempDir.toPath(), ALLOW_INSECURE); + } +} diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java index 45dbb39783393..7a03880eb6838 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java @@ -62,6 +62,7 @@ import org.testng.annotations.Test; import java.io.File; +import java.io.IOException; import java.math.BigDecimal; import java.time.Instant; import java.time.LocalDate; @@ -3096,6 +3097,22 @@ public void testCreateUnpartitionedTableAndQuery() assertEqualsIgnoreOrder(actualAfterTransaction, expected); } + @Test + public void testCreateExternalTableWithDataNotAllowed() + throws IOException + { + File tempDir = createTempDir(); + + @Language("SQL") String createTableSql = format("" + + "CREATE TABLE test_create_external " + + "WITH (external_location = '%s') AS " + + "SELECT * FROM tpch.tiny.nation", + tempDir.toURI().toASCIIString()); + + assertQueryFails(createTableSql, "Writes to non-managed Hive tables is disabled"); + deleteRecursively(tempDir.toPath(), ALLOW_INSECURE); + } + @Test public void testAddColumn() { diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java index 20a511f430e2c..660336387bb23 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java @@ -303,7 +303,8 @@ private static ConnectorPageSink createPageSink(HiveTransactionHandle transactio ImmutableList.of(), "test", ImmutableMap.of(), - Optional.empty()); + Optional.empty(), + false); HdfsEnvironment hdfsEnvironment = createTestHdfsEnvironment(config, metastoreClientConfig); HivePageSinkProvider provider = new HivePageSinkProvider( getDefaultHiveFileWriterFactories(config, metastoreClientConfig), diff --git a/presto-product-tests/src/main/java/com/facebook/presto/tests/TestGroups.java b/presto-product-tests/src/main/java/com/facebook/presto/tests/TestGroups.java index b10e8befaf512..08065a71534f5 100644 --- a/presto-product-tests/src/main/java/com/facebook/presto/tests/TestGroups.java +++ b/presto-product-tests/src/main/java/com/facebook/presto/tests/TestGroups.java @@ -62,6 +62,7 @@ public final class TestGroups public static final String CANCEL_QUERY = "cancel_query"; public static final String BIG_QUERY = "big_query"; public static final String HIVE_TABLE_STATISTICS = "hive_table_statistics"; + public static final String HIVE_WITH_EXTERNAL_WRITES = "hive_with_external_writes"; public static final String KAFKA = "kafka"; public static final String AVRO = "avro"; diff --git a/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveCreateExternalTable.java b/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveCreateExternalTable.java new file mode 100644 index 0000000000000..97adbf009ff9d --- /dev/null +++ b/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveCreateExternalTable.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.tests.hive; + +import com.google.inject.Inject; +import io.prestodb.tempto.ProductTest; +import io.prestodb.tempto.hadoop.hdfs.HdfsClient; +import org.testng.annotations.Test; + +import static com.facebook.presto.tests.TestGroups.HIVE_WITH_EXTERNAL_WRITES; +import static com.facebook.presto.tests.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.prestodb.tempto.query.QueryExecutor.query; +import static java.lang.String.format; + +public class TestHiveCreateExternalTable + extends ProductTest +{ + private static final String HIVE_CATALOG_NAME = "hive_with_external_writes"; + + @Inject + private HdfsClient hdfsClient; + + @Test(groups = {HIVE_WITH_EXTERNAL_WRITES, PROFILE_SPECIFIC_TESTS}) + public void testCreateExternalTableWithInaccessibleSchemaLocation() + { + String schema = "schema_without_location"; + String schemaLocation = "/tmp/" + schema; + hdfsClient.createDirectory(schemaLocation); + query(format("CREATE SCHEMA %s.%s WITH (location='%s')", + HIVE_CATALOG_NAME, schema, schemaLocation)); + + hdfsClient.delete(schemaLocation); + + String table = "test_create_external"; + String tableLocation = "/tmp/" + table; + query(format("CREATE TABLE %s.%s.%s WITH (external_location = '%s') AS " + + "SELECT * FROM tpch.tiny.nation", + HIVE_CATALOG_NAME, schema, table, tableLocation)); + } +}