Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/143940.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
area: ES|QL
issues: []
pr: 143940
summary: Push stats to external source via metadata
type: enhancement
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.ColumnStatistics;
import org.apache.orc.DoubleColumnStatistics;
import org.apache.orc.IntegerColumnStatistics;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.StringColumnStatistics;
import org.apache.orc.TypeDescription;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
Expand All @@ -34,6 +38,7 @@
import org.elasticsearch.xpack.esql.datasources.spi.FormatReader;
import org.elasticsearch.xpack.esql.datasources.spi.SimpleSourceMetadata;
import org.elasticsearch.xpack.esql.datasources.spi.SourceMetadata;
import org.elasticsearch.xpack.esql.datasources.spi.SourceStatistics;
import org.elasticsearch.xpack.esql.datasources.spi.StorageObject;

import java.io.IOException;
Expand All @@ -44,6 +49,8 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalLong;

/**
* {@link FormatReader} implementation for Apache ORC files.
Expand Down Expand Up @@ -71,18 +78,98 @@ public OrcFormatReader(BlockFactory blockFactory) {

@Override
public SourceMetadata metadata(StorageObject object) throws IOException {
List<Attribute> schema = readSchema(object);
return new SimpleSourceMetadata(schema, formatName(), object.path().toString());
}

private static List<Attribute> readSchema(StorageObject object) throws IOException {
OrcStorageObjectAdapter fs = new OrcStorageObjectAdapter(object);
Path path = new Path(object.path().toString());
OrcFile.ReaderOptions options = OrcFile.readerOptions(new Configuration(false)).filesystem(fs);
try (Reader reader = OrcFile.createReader(path, options)) {
TypeDescription schema = reader.getSchema();
return convertOrcSchemaToAttributes(schema);
List<Attribute> attributes = convertOrcSchemaToAttributes(schema);
SourceStatistics statistics = extractStatistics(reader, schema);
return new SimpleSourceMetadata(attributes, formatName(), object.path().toString(), statistics, null);
}
}

private static SourceStatistics extractStatistics(Reader reader, TypeDescription schema) {
long rowCount = reader.getNumberOfRows();
long sizeInBytes = reader.getContentLength();
ColumnStatistics[] orcStats = reader.getStatistics();
List<String> fieldNames = schema.getFieldNames();
List<TypeDescription> children = schema.getChildren();

Map<String, SourceStatistics.ColumnStatistics> columnStats = new HashMap<>();
for (int i = 0; i < fieldNames.size(); i++) {
String name = fieldNames.get(i);
int colId = children.get(i).getId();
if (colId >= orcStats.length) {
continue;
}
ColumnStatistics cs = orcStats[colId];
long totalValues = cs.getNumberOfValues();
long nullCount = rowCount - totalValues;
Object minVal = extractOrcMin(cs);
Object maxVal = extractOrcMax(cs);

columnStats.put(name, new SourceStatistics.ColumnStatistics() {
@Override
public OptionalLong nullCount() {
return OptionalLong.of(nullCount);
}

@Override
public OptionalLong distinctCount() {
return OptionalLong.empty();
}

@Override
public Optional<Object> minValue() {
return Optional.ofNullable(minVal);
}

@Override
public Optional<Object> maxValue() {
return Optional.ofNullable(maxVal);
}
});
}

return new SourceStatistics() {
@Override
public OptionalLong rowCount() {
return OptionalLong.of(rowCount);
}

@Override
public OptionalLong sizeInBytes() {
return OptionalLong.of(sizeInBytes);
}

@Override
public Optional<Map<String, SourceStatistics.ColumnStatistics>> columnStatistics() {
return columnStats.isEmpty() ? Optional.empty() : Optional.of(columnStats);
}
};
}

private static Object extractOrcMin(ColumnStatistics cs) {
if (cs instanceof IntegerColumnStatistics intStats) {
return intStats.getMinimum();
} else if (cs instanceof DoubleColumnStatistics dblStats) {
return dblStats.getMinimum();
} else if (cs instanceof StringColumnStatistics strStats) {
return strStats.getMinimum();
}
return null;
}

private static Object extractOrcMax(ColumnStatistics cs) {
if (cs instanceof IntegerColumnStatistics intStats) {
return intStats.getMaximum();
} else if (cs instanceof DoubleColumnStatistics dblStats) {
return dblStats.getMaximum();
} else if (cs instanceof StringColumnStatistics strStats) {
return strStats.getMaximum();
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import org.apache.parquet.column.ColumnReader;
import org.apache.parquet.column.impl.ColumnReadStoreImpl;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.api.Converter;
Expand All @@ -38,6 +40,7 @@
import org.elasticsearch.xpack.esql.datasources.spi.FormatReader;
import org.elasticsearch.xpack.esql.datasources.spi.SimpleSourceMetadata;
import org.elasticsearch.xpack.esql.datasources.spi.SourceMetadata;
import org.elasticsearch.xpack.esql.datasources.spi.SourceStatistics;
import org.elasticsearch.xpack.esql.datasources.spi.StorageObject;

import java.io.IOException;
Expand All @@ -47,6 +50,8 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalLong;

/**
* FormatReader implementation for Parquet files.
Expand All @@ -72,24 +77,113 @@ public ParquetFormatReader(BlockFactory blockFactory) {

@Override
public SourceMetadata metadata(StorageObject object) throws IOException {
List<Attribute> schema = readSchema(object);
return new SimpleSourceMetadata(schema, formatName(), object.path().toString());
}

private List<Attribute> readSchema(StorageObject object) throws IOException {
// Adapt StorageObject to Parquet InputFile
InputFile parquetInputFile = new ParquetStorageObjectAdapter(object);

// Build ParquetReadOptions with SKIP_ROW_GROUPS to only read schema metadata
ParquetReadOptions options = ParquetReadOptions.builder().withMetadataFilter(ParquetMetadataConverter.SKIP_ROW_GROUPS).build();
ParquetReadOptions options = ParquetReadOptions.builder().build();

try (ParquetFileReader reader = ParquetFileReader.open(parquetInputFile, options)) {
FileMetaData fileMetaData = reader.getFileMetaData();
MessageType parquetSchema = fileMetaData.getSchema();
List<Attribute> schema = convertParquetSchemaToAttributes(parquetSchema);
SourceStatistics statistics = extractStatistics(reader, parquetSchema);
return new SimpleSourceMetadata(schema, formatName(), object.path().toString(), statistics, null);
}
}

@SuppressWarnings("rawtypes")
private SourceStatistics extractStatistics(ParquetFileReader reader, MessageType schema) {
List<BlockMetaData> rowGroups = reader.getRowGroups();
if (rowGroups.isEmpty()) {
return null;
}

long totalRows = 0;
long totalSize = 0;
Map<String, long[]> nullCounts = new HashMap<>();
Map<String, Comparable[]> mins = new HashMap<>();
Map<String, Comparable[]> maxs = new HashMap<>();

for (BlockMetaData rowGroup : rowGroups) {
totalRows += rowGroup.getRowCount();
totalSize += rowGroup.getTotalByteSize();
for (ColumnChunkMetaData col : rowGroup.getColumns()) {
String colName = col.getPath().toDotString();
Statistics stats = col.getStatistics();
if (stats == null || stats.isEmpty()) {
continue;
}
nullCounts.merge(colName, new long[] { stats.getNumNulls() }, (a, b) -> {
a[0] += b[0];
return a;
});
if (stats.hasNonNullValue()) {
mins.merge(colName, new Comparable[] { stats.genericGetMin() }, (a, b) -> {
@SuppressWarnings("unchecked")
int cmp = a[0].compareTo(b[0]);
if (cmp > 0) a[0] = b[0];
return a;
});
maxs.merge(colName, new Comparable[] { stats.genericGetMax() }, (a, b) -> {
@SuppressWarnings("unchecked")
int cmp = a[0].compareTo(b[0]);
if (cmp < 0) a[0] = b[0];
return a;
});
}
}
}

final long rowCount = totalRows;
final long sizeBytes = totalSize;
Map<String, SourceStatistics.ColumnStatistics> columnStats = new HashMap<>();
for (Type field : schema.getFields()) {
String name = field.getName();
long[] nc = nullCounts.get(name);
Comparable[] mn = mins.get(name);
Comparable[] mx = maxs.get(name);
if (nc != null || mn != null || mx != null) {
final long nullCount = nc != null ? nc[0] : 0;
final Object minVal = mn != null ? mn[0] : null;
final Object maxVal = mx != null ? mx[0] : null;
columnStats.put(name, new SourceStatistics.ColumnStatistics() {
@Override
public OptionalLong nullCount() {
return OptionalLong.of(nullCount);
}

@Override
public OptionalLong distinctCount() {
return OptionalLong.empty();
}

@Override
public Optional<Object> minValue() {
return Optional.ofNullable(minVal);
}

// Convert Parquet schema directly to ESQL Attributes
return convertParquetSchemaToAttributes(parquetSchema);
@Override
public Optional<Object> maxValue() {
return Optional.ofNullable(maxVal);
}
});
}
}

return new SourceStatistics() {
@Override
public OptionalLong rowCount() {
return OptionalLong.of(rowCount);
}

@Override
public OptionalLong sizeInBytes() {
return OptionalLong.of(sizeBytes);
}

@Override
public Optional<Map<String, ColumnStatistics>> columnStatistics() {
return columnStats.isEmpty() ? Optional.empty() : Optional.of(columnStats);
}
};
}

@Override
Expand Down
Loading