Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
<dep.kafka.version>2.3.1</dep.kafka.version>
<dep.druid.version>0.17.0</dep.druid.version>
<dep.jaxb.version>2.3.1</dep.jaxb.version>

<dep.hudi.version>0.5.3</dep.hudi.version>
<!--
America/Bahia_Banderas has:
- offset change since 1970 (offset Jan 1970: -08:00, offset Jan 2018: -06:00)
Expand Down Expand Up @@ -1002,7 +1002,7 @@
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hadoop-mr</artifactId>
<version>0.5.1-incubating</version>
<version>${dep.hudi.version}</version>
<exclusions>
<exclusion>
<groupId>org.objenesis</groupId>
Expand Down
11 changes: 11 additions & 0 deletions presto-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-parquet</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-encoding</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand All @@ -56,6 +62,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand All @@ -78,6 +88,7 @@
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hadoop-mr</artifactId>
<version>${dep.hudi.version}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;

import java.io.BufferedReader;
import java.io.IOException;
Expand Down Expand Up @@ -371,7 +372,7 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
schedulerUsesHostAddresses,
partition.getEncryptionInformation());

if (!isHudiInputFormat(inputFormat) && shouldUseFileSplitsFromInputFormat(inputFormat)) {
if (!isHudiParquetInputFormat(inputFormat) && shouldUseFileSplitsFromInputFormat(inputFormat)) {
if (tableBucketInfo.isPresent()) {
throw new PrestoException(NOT_SUPPORTED, "Presto cannot read bucketed partition in an input format with UseFileSplitsFromInputFormat annotation: " + inputFormat.getClass().getSimpleName());
}
Expand All @@ -381,7 +382,7 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)

return addSplitsToSource(splits, splitFactory);
}
PathFilter pathFilter = isHudiInputFormat(inputFormat) ? hoodiePathFilterSupplier.get() : path1 -> true;
PathFilter pathFilter = isHudiParquetInputFormat(inputFormat) ? hoodiePathFilterSupplier.get() : path1 -> true;
// S3 Select pushdown works at the granularity of individual S3 objects,
// therefore we must not split files when it is enabled.
Properties schema = getHiveSchema(storage.getSerdeParameters(), table.getParameters());
Expand Down Expand Up @@ -421,8 +422,11 @@ private ListenableFuture<?> addSplitsToSource(InputSplit[] targetSplits, Interna
return lastResult;
}

private static boolean isHudiInputFormat(InputFormat<?, ?> inputFormat)
private static boolean isHudiParquetInputFormat(InputFormat<?, ?> inputFormat)
{
if (inputFormat instanceof HoodieParquetRealtimeInputFormat) {
return false;
}
return inputFormat instanceof HoodieParquetInputFormat;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

Expand Down Expand Up @@ -58,7 +59,8 @@ public Optional<RecordCursor> createRecordCursor(
TupleDomain<HiveColumnHandle> effectivePredicate,
DateTimeZone hiveStorageTimeZone,
TypeManager typeManager,
boolean s3SelectPushdownEnabled)
boolean s3SelectPushdownEnabled,
Map<String, String> customSplitInfo)
{
// make sure the FileSystem is created with the proper Configuration object
try {
Expand All @@ -69,7 +71,7 @@ public Optional<RecordCursor> createRecordCursor(
}

RecordReader<?, ?> recordReader = hdfsEnvironment.doAs(session.getUser(),
() -> HiveUtil.createRecordReader(configuration, path, start, length, schema, columns));
() -> HiveUtil.createRecordReader(configuration, path, start, length, schema, columns, customSplitInfo));

return Optional.of(new GenericHiveRecordCursor<>(
configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ public ConnectorPageSource createPageSource(
hiveLayout.getRemainingPredicate(),
hiveLayout.isPushdownFilterEnabled(),
rowExpressionService,
encryptionInformation);
encryptionInformation,
hiveSplit.getCustomSplitInfo());
if (pageSource.isPresent()) {
return pageSource.get();
}
Expand Down Expand Up @@ -318,7 +319,8 @@ public static Optional<ConnectorPageSource> createHivePageSource(
RowExpression remainingPredicate,
boolean isPushdownFilterEnabled,
RowExpressionService rowExpressionService,
Optional<EncryptionInformation> encryptionInformation)
Optional<EncryptionInformation> encryptionInformation,
Map<String, String> customSplitInfo)
{
List<HiveColumnHandle> allColumns;

Expand Down Expand Up @@ -422,7 +424,8 @@ public static Optional<ConnectorPageSource> createHivePageSource(
effectivePredicate,
hiveStorageTimeZone,
typeManager,
s3SelectPushdownEnabled);
s3SelectPushdownEnabled,
customSplitInfo);

if (cursor.isPresent()) {
RecordCursor delegate = cursor.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.joda.time.DateTimeZone;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

Expand All @@ -39,5 +40,6 @@ Optional<RecordCursor> createRecordCursor(
TupleDomain<HiveColumnHandle> effectivePredicate,
DateTimeZone hiveStorageTimeZone,
TypeManager typeManager,
boolean s3SelectPushdownEnabled);
boolean s3SelectPushdownEnabled,
Map<String, String> customSplitInfo);
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class HiveSplit
private final Optional<byte[]> extraFileInfo;
private final CacheQuotaRequirement cacheQuotaRequirement;
private final Optional<EncryptionInformation> encryptionInformation;
private final Map<String, String> customSplitInfo;

@JsonCreator
public HiveSplit(
Expand All @@ -82,7 +83,8 @@ public HiveSplit(
@JsonProperty("s3SelectPushdownEnabled") boolean s3SelectPushdownEnabled,
@JsonProperty("extraFileInfo") Optional<byte[]> extraFileInfo,
@JsonProperty("cacheQuota") CacheQuotaRequirement cacheQuotaRequirement,
@JsonProperty("encryptionMetadata") Optional<EncryptionInformation> encryptionInformation)
@JsonProperty("encryptionMetadata") Optional<EncryptionInformation> encryptionInformation,
@JsonProperty("customSplitInfo") Map<String, String> customSplitInfo)
{
checkArgument(start >= 0, "start must be positive");
checkArgument(length >= 0, "length must be positive");
Expand Down Expand Up @@ -123,6 +125,7 @@ public HiveSplit(
this.extraFileInfo = extraFileInfo;
this.cacheQuotaRequirement = cacheQuotaRequirement;
this.encryptionInformation = encryptionInformation;
this.customSplitInfo = ImmutableMap.copyOf(requireNonNull(customSplitInfo, "customSplitInfo is null"));
}

@JsonProperty
Expand Down Expand Up @@ -265,6 +268,12 @@ public Optional<EncryptionInformation> getEncryptionInformation()
return encryptionInformation;
}

@JsonProperty
public Map<String, String> getCustomSplitInfo()
{
return customSplitInfo;
}

@Override
public Object getInfo()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,8 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) {
internalSplit.isS3SelectPushdownEnabled(),
internalSplit.getExtraFileInfo(),
cacheQuotaRequirement,
internalSplit.getEncryptionInformation()));
internalSplit.getEncryptionInformation(),
internalSplit.getCustomSplitInfo()));

internalSplit.increaseStart(splitBytes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@
import com.facebook.presto.hadoop.TextLineLengthLimitExceededException;
import com.facebook.presto.hive.avro.PrestoAvroSerDe;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.pagefile.PageInputFormat;
import com.facebook.presto.hive.util.FooterAwareRecordReader;
import com.facebook.presto.hive.util.HudiRealtimeSplitConverter;
import com.facebook.presto.orc.OrcReader;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.SchemaTableName;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -72,6 +75,7 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
Expand All @@ -83,11 +87,13 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -133,6 +139,7 @@
import static com.facebook.presto.hive.metastore.MetastoreUtil.checkCondition;
import static com.facebook.presto.hive.util.ConfigurationUtils.copy;
import static com.facebook.presto.hive.util.ConfigurationUtils.toJobConf;
import static com.facebook.presto.hive.util.CustomSplitConversionUtils.recreateSplitWithCustomInfo;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.MoreObjects.firstNonNull;
Expand All @@ -158,6 +165,7 @@
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB;
import static org.apache.hadoop.hive.serde2.ColumnProjectionUtils.READ_ALL_COLUMNS;
import static org.apache.hadoop.hive.serde2.ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR;
import static org.apache.hadoop.hive.serde2.ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;

public final class HiveUtil
Expand Down Expand Up @@ -202,7 +210,7 @@ private HiveUtil()
{
}

public static RecordReader<?, ?> createRecordReader(Configuration configuration, Path path, long start, long length, Properties schema, List<HiveColumnHandle> columns)
public static RecordReader<?, ?> createRecordReader(Configuration configuration, Path path, long start, long length, Properties schema, List<HiveColumnHandle> columns, Map<String, String> customSplitInfo)
{
// determine which hive columns we will read
List<HiveColumnHandle> readColumns = ImmutableList.copyOf(filter(columns, column -> column.getColumnType() == REGULAR));
Expand All @@ -211,14 +219,26 @@ private HiveUtil()
// Tell hive the columns we would like to read, this lets hive optimize reading column oriented files
setReadColumns(configuration, readHiveColumnIndexes);

// Only propagate serialization schema configs by default
Predicate<String> schemaFilter = schemaProperty -> schemaProperty.startsWith("serialization.");

InputFormat<?, ?> inputFormat = getInputFormat(configuration, getInputFormatName(schema), true);
JobConf jobConf = toJobConf(configuration);
FileSplit fileSplit = new FileSplit(path, start, length, (String[]) null);
if (!customSplitInfo.isEmpty() && isHudiRealtimeSplit(customSplitInfo)) {
fileSplit = recreateSplitWithCustomInfo(fileSplit, customSplitInfo);

// Add additional column information for record reader
List<String> readHiveColumnNames = ImmutableList.copyOf(transform(readColumns, HiveColumnHandle::getName));
jobConf.set(READ_COLUMN_NAMES_CONF_STR, Joiner.on(',').join(readHiveColumnNames));

// Remove filter when using customSplitInfo as the record reader requires complete schema configs
schemaFilter = schemaProperty -> true;
}

// propagate serialization configuration to getRecordReader
schema.stringPropertyNames().stream()
.filter(name -> name.startsWith("serialization."))
.forEach(name -> jobConf.set(name, schema.getProperty(name)));
.filter(schemaFilter)
.forEach(name -> jobConf.set(name, schema.getProperty(name)));

// add Airlift LZO and LZOP to head of codecs list so as to not override existing entries
List<String> codecs = newArrayList(Splitter.on(",").trimResults().omitEmptyStrings().split(jobConf.get("io.compression.codecs", "")));
Expand Down Expand Up @@ -261,6 +281,12 @@ private HiveUtil()
}
}

private static boolean isHudiRealtimeSplit(Map<String, String> customSplitInfo)
{
String customSplitClass = customSplitInfo.get(HudiRealtimeSplitConverter.CUSTOM_SPLIT_CLASS_KEY);
return HoodieRealtimeFileSplit.class.getName().equals(customSplitClass);
}

public static void setReadColumns(Configuration configuration, List<Integer> readHiveColumnIndexes)
{
configuration.set(READ_COLUMN_IDS_CONF_STR, Joiner.on(',').join(readHiveColumnIndexes));
Expand All @@ -285,7 +311,7 @@ public static Optional<CompressionCodec> getCompressionCodec(TextInputFormat inp
return Optional.ofNullable(compressionCodecFactory.getCodec(file));
}

static InputFormat<?, ?> getInputFormat(Configuration configuration, String inputFormatName, boolean symlinkTarget)
public static InputFormat<?, ?> getInputFormat(Configuration configuration, String inputFormatName, boolean symlinkTarget)
{
try {
JobConf jobConf = toJobConf(configuration);
Expand Down Expand Up @@ -328,6 +354,15 @@ static String getInputFormatName(Properties schema)
return name;
}

public static boolean shouldUseRecordReaderFromInputFormat(Configuration configuration, Storage storage)
{
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration, storage.getStorageFormat().getInputFormat(), false);
return Arrays.stream(inputFormat.getClass().getAnnotations())
.map(Annotation::annotationType)
.map(Class::getSimpleName)
.anyMatch(name -> name.equals("UseRecordReaderFromInputFormat"));
}

public static long parseHiveDate(String value)
{
long millis = HIVE_DATE_PARSER.parseMillis(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.Path;
import org.openjdk.jol.info.ClassLayout;

Expand Down Expand Up @@ -65,6 +66,7 @@ public class InternalHiveSplit
private final HiveSplitPartitionInfo partitionInfo;
private final Optional<byte[]> extraFileInfo;
private final Optional<EncryptionInformation> encryptionInformation;
private final Map<String, String> customSplitInfo;

private long start;
private int currentBlockIndex;
Expand All @@ -82,7 +84,8 @@ public InternalHiveSplit(
boolean s3SelectPushdownEnabled,
HiveSplitPartitionInfo partitionInfo,
Optional<byte[]> extraFileInfo,
Optional<EncryptionInformation> encryptionInformation)
Optional<EncryptionInformation> encryptionInformation,
Map<String, String> customSplitInfo)
{
checkArgument(start >= 0, "start must be positive");
checkArgument(end >= 0, "end must be positive");
Expand All @@ -106,6 +109,8 @@ public InternalHiveSplit(
this.s3SelectPushdownEnabled = s3SelectPushdownEnabled;
this.partitionInfo = partitionInfo;
this.extraFileInfo = extraFileInfo;
this.customSplitInfo = ImmutableMap
.copyOf(requireNonNull(customSplitInfo, "customSplitInfo is null"));

ImmutableList.Builder<List<HostAddress>> addressesBuilder = ImmutableList.builder();
blockEndOffsets = new long[blocks.size()];
Expand Down Expand Up @@ -222,6 +227,11 @@ public Optional<EncryptionInformation> getEncryptionInformation()
return this.encryptionInformation;
}

public Map<String, String> getCustomSplitInfo()
{
return customSplitInfo;
}

public void reset()
{
currentBlockIndex = 0;
Expand Down
Loading