Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.CorruptStatistics;
import org.apache.parquet.Log;
import org.apache.parquet.format.PageEncodingStats;
Expand Down Expand Up @@ -81,6 +82,20 @@ public class ParquetMetadataConverter {

private static final Log LOG = Log.getLog(ParquetMetadataConverter.class);

private final boolean useSignedStringMinMax;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about: useSignedMinMaxForUnsignedValues

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is more accurate. The override only works for String values (utf8, enum, json) because those are the only types where it can be safe to use the signed min and max.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add this as a javadoc comment here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this controls all binaries except enum/unsigned int, can we rename this something like useSignedBinaryMinMax ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't for binaries, it's for all string representations.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to add a comment on this only working for strings.

Copy link
Contributor Author

@rdblue rdblue Oct 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the field name is pretty clear. What information would you like to see added in a comment?


public ParquetMetadataConverter() {
Copy link
Member

@julienledem julienledem Sep 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If for backward compatibility mark this as deprecated.

Copy link
Contributor

@isnotinvain isnotinvain Oct 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

edit: still figuring out github's new review system. this was supposed to be +1 to adding a deprecated annotation to this if it's only here for compatibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should this be deprecated?

It's perfectly valid to create a ParquetMetadataConverter without any configuration. It could easily be that when we restructure this into parquet-common, it's the other constructor that goes away, so I don't think we should consider it deprecated.

this(false);
}

public ParquetMetadataConverter(Configuration conf) {
this(conf.getBoolean("parquet.strings.signed-min-max.enabled", false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this key live in a public static constant? and again here, maybe use 'binary' instead of 'string' ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We removed the public constant to avoid leaking this into the public API and not being able to remove it. Multiple packages reference it, so it would need to be public. I think it's fine to use a String for now.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little torn about keeping this as a string. It's easy to mess things up in that case. Would it be an option to add it as a constant with a comment / annotation indicating it is liable to change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that there is no good place to add this and we don't want to add it to the public API. The long-term fix will very likely move this option outside of the converter (returning the min/max and the sort order used during aggregation), and it isn't something we want to expose in ParquetInputFormat or ParquetReader, or else people will turn it on without understanding what it does. I agree it isn't ideal, but I think this is fine until we have a permanent solution to the problem.

}
Copy link
Member

@julienledem julienledem Sep 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this constructor and just have the call to isSignedStringMinMaxEnabled where this is constructed.


private ParquetMetadataConverter(boolean useSignedStringMinMax) {
this.useSignedStringMinMax = useSignedStringMinMax;
}

// NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate
// sets of encodings. It is important that all collections inserted to this cache be
// immutable and have thread-safe read-only access. This can be achieved by wrapping
Expand Down Expand Up @@ -308,14 +323,28 @@ public static org.apache.parquet.column.statistics.Statistics fromParquetStatist
return fromParquetStatistics(null, statistics, type);
}

/**
* @deprecated Use {@link #fromParquetStatistics(String, Statistics, PrimitiveType)} instead.
*/
@Deprecated
public static org.apache.parquet.column.statistics.Statistics fromParquetStatistics
(String createdBy, Statistics statistics, PrimitiveTypeName type) {
return fromParquetStatisticsInternal(createdBy, statistics, type, defaultSortOrder(type));
}

// Visible for testing
static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInternal
(String createdBy, Statistics statistics, PrimitiveTypeName type, SortOrder typeSortOrder) {
// create stats object based on the column type
org.apache.parquet.column.statistics.Statistics stats = org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType(type);
// If there was no statistics written to the footer, create an empty Statistics object and return

// NOTE: See docs in CorruptStatistics for explanation of why this check is needed

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might also want to add a line on the sort order bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (statistics != null && !CorruptStatistics.shouldIgnoreStatistics(createdBy, type)) {
// The sort order is checked to avoid returning min/max stats that are not
// valid with the type's sort order. Currently, all stats are aggregated
// using a signed ordering, which isn't valid for strings or unsigned ints.
if (statistics != null && !CorruptStatistics.shouldIgnoreStatistics(createdBy, type) &&
SortOrder.SIGNED == typeSortOrder) {
if (statistics.isSetMax() && statistics.isSetMin()) {
stats.setMinMaxFromBytes(statistics.min.array(), statistics.max.array());
}
Expand All @@ -324,6 +353,112 @@ public static org.apache.parquet.column.statistics.Statistics fromParquetStatist
return stats;
}

public org.apache.parquet.column.statistics.Statistics fromParquetStatistics(
String createdBy, Statistics statistics, PrimitiveType type) {
SortOrder expectedOrder = overrideSortOrderToSigned(type) ?
SortOrder.SIGNED : sortOrder(type);
return fromParquetStatisticsInternal(
createdBy, statistics, type.getPrimitiveTypeName(), expectedOrder);
}

/**
* Sort order for page and column statistics. Types are associated with sort
* orders (e.g., UTF8 columns should use UNSIGNED) and column stats are
* aggregated using a sort order. As of parquet-format version 2.3.1, the
* order used to aggregate stats is always SIGNED and is not stored in the
* Parquet file. These stats are discarded for types that need unsigned.
*
* See PARQUET-686.
*/
enum SortOrder {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a couple of lines of javadoc to indicate what this struct is for? Also what is the convention we follow for types like enums / nested classes - should they be declared at the top of the class? Or anywhere goes? (persnally prefer the top)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is internal and will be removed (or moved into common and made public) when we add the long-term fix. That's why I'm keeping all of these classes together. That way the chance of a conflict is minimized.

SIGNED,
UNSIGNED,
UNKNOWN
}

private static final Set<OriginalType> STRING_TYPES = Collections

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to top?

.unmodifiableSet(new HashSet<>(Arrays.asList(
OriginalType.UTF8, OriginalType.ENUM, OriginalType.JSON
)));

/**
* Returns whether to use signed order min and max with a type. It is safe to
* use signed min and max when the type is a string type and contains only
* ASCII characters (where the sign bit was 0). This checks whether the type
* is a string type and uses {@code useSignedStringMinMax} to determine if
* only ASCII characters were written.
*
* @param type a primitive type with a logical type annotation
* @return true if signed order min/max can be used with this type
*/
private boolean overrideSortOrderToSigned(PrimitiveType type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is adding a lot of complication / corner cases. Do we really need to support using signed ordering for ascii only strings? It's especially confusing that we allow this for un-annotated binary columns (we just assume they are string-ish, which doesn't seem valid).

It seems like this could be simplified by simply returning the found min/max and the ordering with which they were written and letting the filter layer decide what to do there? I think you mentioned that maybe that's the plan going forward, if it is, can we mark all these extra methods around dealing with overrides as temporary / to be deleted soon?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Impala writes Strings as unannotated binary, so we should consider it string-ish if useSignedStringMinMax is set.

While passing stats and the sort order should be the approach we take, I think it will require some design and thought to make sure we don't pass invalid stats to applications. If we simply added the sort order to the current stats objects, engines like Presto would keep using the bad stats without checking it. This commit will prevent applications from getting stats and then we can take the time to fix it correctly.

// even if the override is set, only return stats for string-ish types
// a null type annotation is considered string-ish because some writers
// failed to use the UTF8 annotation.
OriginalType annotation = type.getOriginalType();
return useSignedStringMinMax &&
PrimitiveTypeName.BINARY == type.getPrimitiveTypeName() &&
(annotation == null || STRING_TYPES.contains(annotation));
}
Copy link
Member

@julienledem julienledem Sep 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not simply:

private boolean isMinMaxUsableFor(PrimitiveType type) {
 SortOrder expected = sortOrder(type);
 return (expected == SIGNED) || (expected == UNSIGNED && useSignedMinMaxForUnsigned);
}

the case for getOriginalType is already treated in sortOrder(). Why adding another layer here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other method resolves the correct sort order for a logical type. This method determines whether the incorrect sort order can be used. This will only use the signed sort order for string-based types, not for unsigned integer types.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why we restrict it further since the flag is already about lifting the restriction but that's fine with me if you feel we need this.

couple more comments:

  • The name of the method could be clearer. What it actually does is controlling if we're going to override the result of sortOrder().
  • The way the logic is organized here is a little hard to read.
    If you define the accepted String types:
private static final Set<OriginalType> acceptedStringTypes = Collections.unmodifiableSet(new HashSet(Arrays.asList(
  UTF8, ENUM, JSON
)));

then it boils down to a boolean expression that I find easier to understand:

boolean overrideSortOrderToUnsigned(PrimitiveType type) {
  return useSignedStringMinMax 
    && (type.getPrimitiveTypeName() == BINARY) 
    && (type.getOriginalType() == null || acceptedStringTypes.contains(type.getOriginalType());
}


/**
* @param primitive a primitive physical type
* @return the default sort order used when the logical type is not known
*/
private static SortOrder defaultSortOrder(PrimitiveTypeName primitive) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add javadoc: "used when the type has no OriginalType annotation"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add comments explaining what is meant by default sort order? why are non-banary types involved in this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default sort order for a primitive type is the order we think is appropriate when the logical type isn't known. This is only used for backward-compatibility with the existing static methods to convert stats.

switch (primitive) {
case BOOLEAN:
case INT32:
case INT64:
case FLOAT:
case DOUBLE:
return SortOrder.SIGNED;
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
case INT96: // only used for timestamp, which uses unsigned values
return SortOrder.UNSIGNED;
}
return SortOrder.UNKNOWN;
}

/**
* @param primitive a primitive type with a logical type annotation
* @return the "correct" sort order of the type that applications assume
*/
private static SortOrder sortOrder(PrimitiveType primitive) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/sortOrder/expectedSortOrder ?

Copy link
Contributor Author

@rdblue rdblue Sep 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should return the correct order for the type, not an expected order. I wouldn't use "expected" because the order for the file is known (it will be explicit or is signed) and the correct order is determined by the primitive type. I can see how passing this later as the expected order is strange, though. I'll update that to typeSortOrder to be more clear.

OriginalType annotation = primitive.getOriginalType();
if (annotation != null) {
switch (annotation) {
case INT_8:
case INT_16:
case INT_32:
case INT_64:
case DATE:
case TIME_MICROS:
case TIME_MILLIS:
case TIMESTAMP_MICROS:
case TIMESTAMP_MILLIS:
return SortOrder.SIGNED;
case UINT_8:
case UINT_16:
case UINT_32:
case UINT_64:
case ENUM:
case UTF8:
case BSON:
case JSON:
return SortOrder.UNSIGNED;
case DECIMAL:
case LIST:
case MAP:
case MAP_KEY_VALUE:
case INTERVAL:
return SortOrder.UNKNOWN;
}
}
return defaultSortOrder(primitive.getPrimitiveTypeName());
}

public PrimitiveTypeName getPrimitive(Type type) {
switch (type) {
case BYTE_ARRAY: // TODO: rename BINARY and remove this switch
Expand Down Expand Up @@ -687,7 +822,7 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws
fromParquetStatistics(
parquetMetadata.getCreated_by(),
metaData.statistics,
messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()),
messageType.getType(path.toArray()).asPrimitiveType()),
metaData.data_page_offset,
metaData.dictionary_page_offset,
metaData.num_values,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.STATISTICS;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;
import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC;
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
Expand Down Expand Up @@ -53,6 +52,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -95,6 +95,7 @@
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.schema.PrimitiveType;

/**
* Internal implementation of the Parquet file reader as a block container
Expand All @@ -108,7 +109,7 @@ public class ParquetFileReader implements Closeable {

public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism";

private static ParquetMetadataConverter converter = new ParquetMetadataConverter();
private final ParquetMetadataConverter converter;

/**
* for files provided, check if there's a summary file.
Expand Down Expand Up @@ -444,9 +445,19 @@ public static final ParquetMetadata readFooter(Configuration configuration, File
*/
public static final ParquetMetadata readFooter(
InputFile file, MetadataFilter filter) throws IOException {
ParquetMetadataConverter converter;
// TODO: remove this temporary work-around.
// this is necessary to pass the Configuration to ParquetMetadataConverter
// and should be removed when there is a non-Hadoop configuration.
if (file instanceof HadoopInputFile) {
converter = new ParquetMetadataConverter(
((HadoopInputFile) file).getConfiguration());
} else {
converter = new ParquetMetadataConverter();
}
try (SeekableInputStream in = file.newStream()) {
return readFooter(converter, file.getLength(), file.toString(),
in, filter);

return readFooter(converter, file.getLength(), file.toString(), in, filter);
}
}

Expand Down Expand Up @@ -538,6 +549,7 @@ public ParquetFileReader(Configuration configuration, Path filePath, List<BlockM
public ParquetFileReader(
Configuration configuration, FileMetaData fileMetaData,
Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns) throws IOException {
this.converter = new ParquetMetadataConverter(configuration);
this.conf = configuration;
this.fileMetaData = fileMetaData;
FileSystem fs = filePath.getFileSystem(configuration);
Expand Down Expand Up @@ -569,6 +581,7 @@ private ParquetFileReader(Configuration configuration, Path file) throws IOExcep
* @throws IOException if the file can not be opened
*/
public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) throws IOException {
this.converter = new ParquetMetadataConverter(conf);
this.conf = conf;
FileSystem fs = file.getFileSystem(conf);
this.fileStatus = fs.getFileStatus(file);
Expand All @@ -592,6 +605,7 @@ public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) t
* @throws IOException if the file can not be opened
*/
public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer) throws IOException {
this.converter = new ParquetMetadataConverter(conf);
this.conf = conf;
FileSystem fs = file.getFileSystem(conf);
this.fileStatus = fs.getFileStatus(file);
Expand Down Expand Up @@ -781,7 +795,7 @@ DictionaryPage readDictionary(ColumnChunkMetaData meta) throws IOException {
compressedPage.getEncoding());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should open a JIRA to factor out those constructor and clean up the deprecated ones.
lots of duplication here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but I think we're waiting for a breaking API version to do it.


private static DictionaryPage readCompressedDictionary(
private DictionaryPage readCompressedDictionary(
PageHeader pageHeader, SeekableInputStream fin) throws IOException {
DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header();

Expand Down Expand Up @@ -843,6 +857,8 @@ protected PageHeader readPageHeader() throws IOException {
public ColumnChunkPageReader readAllPages() throws IOException {
List<DataPage> pagesInChunk = new ArrayList<DataPage>();
DictionaryPage dictionaryPage = null;
PrimitiveType type = getFileMetaData().getSchema()
.getType(descriptor.col.getPath()).asPrimitiveType();
long valuesCountReadSoFar = 0;
while (valuesCountReadSoFar < descriptor.metadata.getValueCount()) {
PageHeader pageHeader = readPageHeader();
Expand Down Expand Up @@ -870,10 +886,10 @@ public ColumnChunkPageReader readAllPages() throws IOException {
this.readAsBytesInput(compressedPageSize),
dataHeaderV1.getNum_values(),
uncompressedPageSize,
fromParquetStatistics(
converter.fromParquetStatistics(
getFileMetaData().getCreatedBy(),
dataHeaderV1.getStatistics(),
descriptor.col.getType()),
type),
converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
converter.getEncoding(dataHeaderV1.getEncoding())
Expand All @@ -893,10 +909,10 @@ public ColumnChunkPageReader readAllPages() throws IOException {
converter.getEncoding(dataHeaderV2.getEncoding()),
this.readAsBytesInput(dataSize),
uncompressedPageSize,
fromParquetStatistics(
converter.fromParquetStatistics(
getFileMetaData().getCreatedBy(),
dataHeaderV2.getStatistics(),
descriptor.col.getType()),
type),
dataHeaderV2.isIs_compressed()
));
valuesCountReadSoFar += dataHeaderV2.getNum_values();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,28 @@ public class HadoopInputFile implements InputFile {

private final FileSystem fs;
private final FileStatus stat;
private final Configuration conf;

public static HadoopInputFile fromPath(Path path, Configuration conf)
throws IOException {
FileSystem fs = path.getFileSystem(conf);
return new HadoopInputFile(fs, fs.getFileStatus(path));
return new HadoopInputFile(fs, fs.getFileStatus(path), conf);
}

public static HadoopInputFile fromStatus(FileStatus stat, Configuration conf)
throws IOException {
FileSystem fs = stat.getPath().getFileSystem(conf);
return new HadoopInputFile(fs, stat);
return new HadoopInputFile(fs, stat, conf);
}

private HadoopInputFile(FileSystem fs, FileStatus stat) {
private HadoopInputFile(FileSystem fs, FileStatus stat, Configuration conf) {
this.fs = fs;
this.stat = stat;
this.conf = conf;
}

public Configuration getConfiguration() {
return conf;
}

@Override
Expand Down
Loading