-
Notifications
You must be signed in to change notification settings - Fork 5.5k
feat(plugin-hive): Add support for skip_header_line_count and skip_footer_line_count table properties #26446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -235,10 +235,12 @@ | |
| import static com.facebook.presto.hive.HiveSessionProperties.shouldCreateEmptyBucketFilesForTemporaryTable; | ||
| import static com.facebook.presto.hive.HiveStatisticsUtil.createPartitionStatistics; | ||
| import static com.facebook.presto.hive.HiveStorageFormat.AVRO; | ||
| import static com.facebook.presto.hive.HiveStorageFormat.CSV; | ||
| import static com.facebook.presto.hive.HiveStorageFormat.DWRF; | ||
| import static com.facebook.presto.hive.HiveStorageFormat.ORC; | ||
| import static com.facebook.presto.hive.HiveStorageFormat.PAGEFILE; | ||
| import static com.facebook.presto.hive.HiveStorageFormat.PARQUET; | ||
| import static com.facebook.presto.hive.HiveStorageFormat.TEXTFILE; | ||
| import static com.facebook.presto.hive.HiveStorageFormat.values; | ||
| import static com.facebook.presto.hive.HiveTableProperties.AVRO_SCHEMA_URL; | ||
| import static com.facebook.presto.hive.HiveTableProperties.BUCKETED_BY_PROPERTY; | ||
|
|
@@ -255,6 +257,8 @@ | |
| import static com.facebook.presto.hive.HiveTableProperties.ORC_BLOOM_FILTER_FPP; | ||
| import static com.facebook.presto.hive.HiveTableProperties.PARTITIONED_BY_PROPERTY; | ||
| import static com.facebook.presto.hive.HiveTableProperties.PREFERRED_ORDERING_COLUMNS; | ||
| import static com.facebook.presto.hive.HiveTableProperties.SKIP_FOOTER_LINE_COUNT; | ||
| import static com.facebook.presto.hive.HiveTableProperties.SKIP_HEADER_LINE_COUNT; | ||
| import static com.facebook.presto.hive.HiveTableProperties.SORTED_BY_PROPERTY; | ||
| import static com.facebook.presto.hive.HiveTableProperties.STORAGE_FORMAT_PROPERTY; | ||
| import static com.facebook.presto.hive.HiveTableProperties.getAvroSchemaUrl; | ||
|
|
@@ -265,6 +269,8 @@ | |
| import static com.facebook.presto.hive.HiveTableProperties.getEncryptColumns; | ||
| import static com.facebook.presto.hive.HiveTableProperties.getEncryptTable; | ||
| import static com.facebook.presto.hive.HiveTableProperties.getExternalLocation; | ||
| import static com.facebook.presto.hive.HiveTableProperties.getFooterSkipCount; | ||
| import static com.facebook.presto.hive.HiveTableProperties.getHeaderSkipCount; | ||
| import static com.facebook.presto.hive.HiveTableProperties.getHiveStorageFormat; | ||
| import static com.facebook.presto.hive.HiveTableProperties.getOrcBloomFilterColumns; | ||
| import static com.facebook.presto.hive.HiveTableProperties.getOrcBloomFilterFpp; | ||
|
|
@@ -412,6 +418,9 @@ public class HiveMetadata | |
| private static final String CSV_QUOTE_KEY = OpenCSVSerde.QUOTECHAR; | ||
| private static final String CSV_ESCAPE_KEY = OpenCSVSerde.ESCAPECHAR; | ||
|
|
||
| public static final String SKIP_HEADER_COUNT_KEY = "skip.header.line.count"; | ||
| public static final String SKIP_FOOTER_COUNT_KEY = "skip.footer.line.count"; | ||
|
|
||
| private static final JsonCodec<MaterializedViewDefinition> MATERIALIZED_VIEW_JSON_CODEC = jsonCodec(MaterializedViewDefinition.class); | ||
|
|
||
| private final boolean allowCorruptWritesForTesting; | ||
|
|
@@ -753,6 +762,12 @@ private ConnectorTableMetadata getTableMetadata(Optional<Table> table, SchemaTab | |
| properties.put(AVRO_SCHEMA_URL, avroSchemaUrl); | ||
| } | ||
|
|
||
| // Textfile and CSV specific properties | ||
| getSerdeProperty(table.get(), SKIP_HEADER_COUNT_KEY) | ||
| .ifPresent(skipHeaderCount -> properties.put(SKIP_HEADER_LINE_COUNT, Integer.valueOf(skipHeaderCount))); | ||
|
sourcery-ai[bot] marked this conversation as resolved.
|
||
| getSerdeProperty(table.get(), SKIP_FOOTER_COUNT_KEY) | ||
| .ifPresent(skipFooterCount -> properties.put(SKIP_FOOTER_LINE_COUNT, Integer.valueOf(skipFooterCount))); | ||
|
|
||
| // CSV specific property | ||
| getCsvSerdeProperty(table.get(), CSV_SEPARATOR_KEY) | ||
| .ifPresent(csvSeparator -> properties.put(CSV_SEPARATOR, csvSeparator)); | ||
|
|
@@ -1067,6 +1082,9 @@ private Table prepareTable(ConnectorSession session, ConnectorTableMetadata tabl | |
| else if (tableType.equals(MANAGED_TABLE) || tableType.equals(MATERIALIZED_VIEW)) { | ||
| LocationHandle locationHandle = locationService.forNewTable(metastore, session, schemaName, tableName, isTempPathRequired(session, bucketProperty, preferredOrderingColumns)); | ||
| targetPath = locationService.getQueryWriteInfo(locationHandle).getTargetPath(); | ||
| if (getFooterSkipCount(tableMetadata.getProperties()).isPresent()) { | ||
| throw new PrestoException(NOT_SUPPORTED, format("Cannot create non external table with %s property", SKIP_FOOTER_COUNT_KEY)); | ||
| } | ||
| } | ||
| else { | ||
| throw new IllegalStateException(format("%s is not a valid table type to be created.", tableType)); | ||
|
|
@@ -1294,20 +1312,42 @@ private Map<String, String> getEmptyTableProperties( | |
| tableProperties.put(AVRO_SCHEMA_URL_KEY, validateAndNormalizeAvroSchemaUrl(avroSchemaUrl, hdfsContext)); | ||
| } | ||
|
|
||
| // Textfile and CSV specific properties | ||
| Set<HiveStorageFormat> csvAndTextFile = ImmutableSet.of(TEXTFILE, CSV); | ||
| getHeaderSkipCount(tableMetadata.getProperties()).ifPresent(headerSkipCount -> { | ||
| if (headerSkipCount > 0) { | ||
| checkFormatForProperty(hiveStorageFormat, csvAndTextFile, SKIP_HEADER_LINE_COUNT); | ||
| tableProperties.put(SKIP_HEADER_COUNT_KEY, String.valueOf(headerSkipCount)); | ||
| } | ||
| if (headerSkipCount < 0) { | ||
| throw new PrestoException(HIVE_INVALID_METADATA, format("Invalid value for %s property: %s", SKIP_HEADER_LINE_COUNT, headerSkipCount)); | ||
| } | ||
| }); | ||
|
|
||
| getFooterSkipCount(tableMetadata.getProperties()).ifPresent(footerSkipCount -> { | ||
| if (footerSkipCount > 0) { | ||
| checkFormatForProperty(hiveStorageFormat, csvAndTextFile, SKIP_FOOTER_LINE_COUNT); | ||
| tableProperties.put(SKIP_FOOTER_COUNT_KEY, String.valueOf(footerSkipCount)); | ||
| } | ||
| if (footerSkipCount < 0) { | ||
| throw new PrestoException(HIVE_INVALID_METADATA, format("Invalid value for %s property: %s", SKIP_FOOTER_LINE_COUNT, footerSkipCount)); | ||
| } | ||
| }); | ||
|
|
||
| // CSV specific properties | ||
| getCsvProperty(tableMetadata.getProperties(), CSV_ESCAPE) | ||
| .ifPresent(escape -> { | ||
| checkFormatForProperty(hiveStorageFormat, HiveStorageFormat.CSV, CSV_ESCAPE); | ||
| checkFormatForProperty(hiveStorageFormat, CSV, CSV_ESCAPE); | ||
| tableProperties.put(CSV_ESCAPE_KEY, escape.toString()); | ||
| }); | ||
| getCsvProperty(tableMetadata.getProperties(), CSV_QUOTE) | ||
| .ifPresent(quote -> { | ||
| checkFormatForProperty(hiveStorageFormat, HiveStorageFormat.CSV, CSV_QUOTE); | ||
| checkFormatForProperty(hiveStorageFormat, CSV, CSV_QUOTE); | ||
| tableProperties.put(CSV_QUOTE_KEY, quote.toString()); | ||
| }); | ||
| getCsvProperty(tableMetadata.getProperties(), CSV_SEPARATOR) | ||
| .ifPresent(separator -> { | ||
| checkFormatForProperty(hiveStorageFormat, HiveStorageFormat.CSV, CSV_SEPARATOR); | ||
| checkFormatForProperty(hiveStorageFormat, CSV, CSV_SEPARATOR); | ||
| tableProperties.put(CSV_SEPARATOR_KEY, separator.toString()); | ||
| }); | ||
|
|
||
|
|
@@ -1327,6 +1367,13 @@ private static void checkFormatForProperty(HiveStorageFormat actualStorageFormat | |
| } | ||
| } | ||
|
|
||
| private static void checkFormatForProperty(HiveStorageFormat actualStorageFormat, Set<HiveStorageFormat> expectedStorageFormats, String propertyName) | ||
| { | ||
| if (!expectedStorageFormats.contains(actualStorageFormat)) { | ||
| throw new PrestoException(INVALID_TABLE_PROPERTY, format("Cannot specify %s table property for storage format: %s", propertyName, actualStorageFormat)); | ||
| } | ||
| } | ||
|
|
||
| private String validateAndNormalizeAvroSchemaUrl(String url, HdfsContext context) | ||
| { | ||
| try { | ||
|
|
@@ -1647,7 +1694,15 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto | |
| if (getAvroSchemaUrl(tableMetadata.getProperties()) != null) { | ||
| throw new PrestoException(NOT_SUPPORTED, "CREATE TABLE AS not supported when Avro schema url is set"); | ||
| } | ||
| getHeaderSkipCount(tableMetadata.getProperties()).ifPresent(headerSkipCount -> { | ||
| if (headerSkipCount > 1) { | ||
| throw new PrestoException(NOT_SUPPORTED, format("CREATE TABLE AS not supported when the value of %s property is greater than 1", SKIP_HEADER_COUNT_KEY)); | ||
| } | ||
| }); | ||
|
|
||
| getFooterSkipCount(tableMetadata.getProperties()).ifPresent(footerSkipCount -> { | ||
| throw new PrestoException(NOT_SUPPORTED, format("Property %s is not supported with CREATE TABLE AS", SKIP_FOOTER_COUNT_KEY)); | ||
| }); | ||
|
Comment on lines
+1697
to
+1705
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to confirm, does this mean that we can create a table(
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also feel we should update the error message to
Similar change for the footer, too.
auden-woolfson marked this conversation as resolved.
|
||
| HiveStorageFormat tableStorageFormat = getHiveStorageFormat(tableMetadata.getProperties()); | ||
| List<String> partitionedBy = getPartitionedBy(tableMetadata.getProperties()); | ||
| Optional<HiveBucketProperty> bucketProperty = getBucketProperty(tableMetadata.getProperties()); | ||
|
|
@@ -2016,6 +2071,15 @@ private HiveInsertTableHandle beginInsertInternal(ConnectorSession session, Conn | |
| locationHandle = locationService.forExistingTable(metastore, session, table, tempPathRequired); | ||
| } | ||
|
|
||
| Optional.ofNullable(table.getParameters().get(SKIP_HEADER_COUNT_KEY)).map(Integer::parseInt).ifPresent(headerSkipCount -> { | ||
| if (headerSkipCount > 1) { | ||
| throw new PrestoException(NOT_SUPPORTED, format("INSERT into %s Hive table with value of %s property greater than 1 is not supported", tableName, SKIP_HEADER_COUNT_KEY)); | ||
| } | ||
| }); | ||
| if (table.getParameters().containsKey(SKIP_FOOTER_COUNT_KEY)) { | ||
| throw new PrestoException(NOT_SUPPORTED, format("INSERT into %s Hive table with %s property not supported", tableName, SKIP_FOOTER_COUNT_KEY)); | ||
| } | ||
|
|
||
| Optional<? extends TableEncryptionProperties> tableEncryptionProperties = getTableEncryptionPropertiesFromHiveProperties(table.getParameters(), tableStorageFormat); | ||
|
|
||
| HiveStorageFormat partitionStorageFormat = isRespectTableFormat(session) ? tableStorageFormat : getHiveStorageFormat(session); | ||
|
|
@@ -3676,7 +3740,7 @@ else if (column.isHidden()) { | |
|
|
||
| private static void validateCsvColumns(ConnectorTableMetadata tableMetadata) | ||
| { | ||
| if (getHiveStorageFormat(tableMetadata.getProperties()) != HiveStorageFormat.CSV) { | ||
| if (getHiveStorageFormat(tableMetadata.getProperties()) != CSV) { | ||
| return; | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.