diff --git a/lib/trino-hive-formats/pom.xml b/lib/trino-hive-formats/pom.xml
index 64e917a421cf..e658c72d4cd5 100644
--- a/lib/trino-hive-formats/pom.xml
+++ b/lib/trino-hive-formats/pom.xml
@@ -39,6 +39,11 @@
true
+
+ io.airlift
+ log
+
+
io.airlift
slice
@@ -81,6 +86,11 @@
joda-time
+
+ org.apache.avro
+ avro
+
+
org.gaul
modernizer-maven-annotations
@@ -113,6 +123,13 @@
test
+
+ io.trino
+ trino-main
+ test-jar
+ test
+
+
io.trino
trino-testing-services
diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/UnionToRowCoercionUtils.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/UnionToRowCoercionUtils.java
new file mode 100644
index 000000000000..69ef8cc4edd0
--- /dev/null
+++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/UnionToRowCoercionUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.hive.formats;
+
+import com.google.common.collect.ImmutableList;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.TypeSignature;
+import io.trino.spi.type.TypeSignatureParameter;
+
+import java.util.List;
+
+import static io.trino.spi.type.TinyintType.TINYINT;
+import static io.trino.spi.type.TypeSignatureParameter.namedField;
+
+public final class UnionToRowCoercionUtils
+{
+ public static final String UNION_FIELD_TAG_NAME = "tag";
+ public static final String UNION_FIELD_FIELD_PREFIX = "field";
+ public static final Type UNION_FIELD_TAG_TYPE = TINYINT;
+
+ private UnionToRowCoercionUtils() {}
+
+ public static RowType rowTypeForUnionOfTypes(List types)
+ {
+ ImmutableList.Builder fields = ImmutableList.builder()
+ .add(RowType.field(UNION_FIELD_TAG_NAME, UNION_FIELD_TAG_TYPE));
+ for (int i = 0; i < types.size(); i++) {
+ fields.add(RowType.field(UNION_FIELD_FIELD_PREFIX + i, types.get(i)));
+ }
+ return RowType.from(fields.build());
+ }
+
+ public static TypeSignature rowTypeSignatureForUnionOfTypes(List typeSignatures)
+ {
+ ImmutableList.Builder fields = ImmutableList.builder();
+ fields.add(namedField(UNION_FIELD_TAG_NAME, UNION_FIELD_TAG_TYPE.getTypeSignature()));
+ for (int i = 0; i < typeSignatures.size(); i++) {
+ fields.add(namedField(UNION_FIELD_FIELD_PREFIX + i, typeSignatures.get(i)));
+ }
+ return TypeSignature.rowType(fields.build());
+ }
+}
diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroFileReader.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroFileReader.java
new file mode 100644
index 000000000000..ace474bb58e2
--- /dev/null
+++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroFileReader.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.hive.formats.avro;
+
+import io.trino.filesystem.TrinoInputFile;
+import io.trino.hive.formats.TrinoDataInputStream;
+import io.trino.spi.Page;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableInput;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.function.Function;
+
+import static com.google.common.base.Verify.verify;
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+import static java.util.Objects.requireNonNull;
+
+public class AvroFileReader
+ implements Closeable
+{
+ private final TrinoDataInputStream input;
+ private final AvroPageDataReader dataReader;
+ private final DataFileReader> fileReader;
+ private Page nextPage;
+ private final OptionalLong end;
+
+ public AvroFileReader(
+ TrinoInputFile inputFile,
+ Schema schema,
+ AvroTypeManager avroTypeManager)
+ throws IOException, AvroTypeException
+ {
+ this(inputFile, schema, avroTypeManager, 0, OptionalLong.empty());
+ }
+
+ public AvroFileReader(
+ TrinoInputFile inputFile,
+ Schema schema,
+ AvroTypeManager avroTypeManager,
+ long offset,
+ OptionalLong length)
+ throws IOException, AvroTypeException
+ {
+ requireNonNull(inputFile, "inputFile is null");
+ requireNonNull(schema, "schema is null");
+ requireNonNull(avroTypeManager, "avroTypeManager is null");
+ long fileSize = inputFile.length();
+
+ verify(offset >= 0, "offset is negative");
+ verify(offset < inputFile.length(), "offset is greater than data size");
+ length.ifPresent(lengthLong -> verify(lengthLong >= 1, "length must be at least 1"));
+ end = length.stream().map(l -> l + offset).findFirst();
+ end.ifPresent(endLong -> verify(endLong <= fileSize, "offset plus length is greater than data size"));
+ input = new TrinoDataInputStream(inputFile.newStream());
+ dataReader = new AvroPageDataReader(schema, avroTypeManager);
+ try {
+ fileReader = new DataFileReader<>(new TrinoDataInputStreamAsAvroSeekableInput(input, fileSize), dataReader);
+ fileReader.sync(offset);
+ }
+ catch (AvroPageDataReader.UncheckedAvroTypeException runtimeWrapper) {
+ // Avro Datum Reader interface can't throw checked exceptions when initialized by the file reader,
+ // so the exception is wrapped in a runtime exception that must be unwrapped
+ throw runtimeWrapper.getAvroTypeException();
+ }
+ avroTypeManager.configure(fileReader.getMetaKeys().stream().collect(toImmutableMap(Function.identity(), fileReader::getMeta)));
+ }
+
+ public long getCompletedBytes()
+ {
+ return input.getReadBytes();
+ }
+
+ public long getReadTimeNanos()
+ {
+ return input.getReadTimeNanos();
+ }
+
+ public boolean hasNext()
+ throws IOException
+ {
+ loadNextPageIfNecessary();
+ return nextPage != null;
+ }
+
+ public Page next()
+ throws IOException
+ {
+ if (!hasNext()) {
+ throw new IOException("No more pages available from Avro file");
+ }
+ Page result = nextPage;
+ nextPage = null;
+ return result;
+ }
+
+ private void loadNextPageIfNecessary()
+ throws IOException
+ {
+ while (nextPage == null && (end.isEmpty() || !fileReader.pastSync(end.getAsLong())) && fileReader.hasNext()) {
+ try {
+ nextPage = fileReader.next().orElse(null);
+ }
+ catch (AvroRuntimeException e) {
+ throw new IOException(e);
+ }
+ }
+ if (nextPage == null) {
+ nextPage = dataReader.flush().orElse(null);
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ fileReader.close();
+ }
+
+ private record TrinoDataInputStreamAsAvroSeekableInput(TrinoDataInputStream inputStream, long fileSize)
+ implements SeekableInput
+ {
+ TrinoDataInputStreamAsAvroSeekableInput
+ {
+ requireNonNull(inputStream, "inputStream is null");
+ }
+
+ @Override
+ public void seek(long p)
+ throws IOException
+ {
+ inputStream.seek(p);
+ }
+
+ @Override
+ public long tell()
+ throws IOException
+ {
+ return inputStream.getPos();
+ }
+
+ @Override
+ public long length()
+ {
+ return fileSize;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len)
+ throws IOException
+ {
+ return inputStream.read(b, off, len);
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ inputStream.close();
+ }
+ }
+}
diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroPageDataReader.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroPageDataReader.java
new file mode 100644
index 000000000000..4c4c08f7e052
--- /dev/null
+++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroPageDataReader.java
@@ -0,0 +1,1052 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.hive.formats.avro;
+
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+import io.trino.spi.Page;
+import io.trino.spi.PageBuilder;
+import io.trino.spi.block.BlockBuilder;
+import io.trino.spi.block.SingleRowBlockWriter;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.Type;
+import org.apache.avro.Resolver;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.io.FastReaderBuilder;
+import org.apache.avro.io.parsing.ResolvingGrammarGenerator;
+import org.apache.avro.util.internal.Accessor;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.IntFunction;
+import java.util.stream.IntStream;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verify;
+import static io.trino.hive.formats.UnionToRowCoercionUtils.UNION_FIELD_TAG_TYPE;
+import static io.trino.hive.formats.avro.AvroTypeUtils.isSimpleNullableUnion;
+import static io.trino.hive.formats.avro.AvroTypeUtils.typeFromAvro;
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.IntegerType.INTEGER;
+import static io.trino.spi.type.RealType.REAL;
+import static io.trino.spi.type.VarbinaryType.VARBINARY;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static java.lang.Float.floatToRawIntBits;
+import static java.util.Objects.requireNonNull;
+
+public class AvroPageDataReader
+ implements DatumReader>
+{
+ // same limit as org.apache.avro.io.BinaryDecoder
+ private static final long MAX_ARRAY_SIZE = (long) Integer.MAX_VALUE - 8L;
+
+ private final Schema readerSchema;
+ private Schema writerSchema;
+ private final PageBuilder pageBuilder;
+ private RowBlockBuildingDecoder rowBlockBuildingDecoder;
+ private final AvroTypeManager typeManager;
+
+ public AvroPageDataReader(Schema readerSchema, AvroTypeManager typeManager)
+ throws AvroTypeException
+ {
+ this.readerSchema = requireNonNull(readerSchema, "readerSchema is null");
+ writerSchema = this.readerSchema;
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ try {
+ Type readerSchemaType = typeFromAvro(this.readerSchema, typeManager);
+ verify(readerSchemaType instanceof RowType, "Root Avro type must be a row");
+ pageBuilder = new PageBuilder(readerSchemaType.getTypeParameters());
+ initialize();
+ }
+ catch (org.apache.avro.AvroTypeException e) {
+ throw new AvroTypeException(e);
+ }
+ }
+
+ private void initialize()
+ throws AvroTypeException
+ {
+ verify(readerSchema.getType() == Schema.Type.RECORD, "Avro schema for page reader must be record");
+ verify(writerSchema.getType() == Schema.Type.RECORD, "File Avro schema for page reader must be record");
+ rowBlockBuildingDecoder = new RowBlockBuildingDecoder(writerSchema, readerSchema, typeManager);
+ }
+
+ @Override
+ public void setSchema(Schema schema)
+ {
+ requireNonNull(schema, "schema is null");
+ if (schema != writerSchema) {
+ writerSchema = schema;
+ try {
+ initialize();
+ }
+ catch (org.apache.avro.AvroTypeException e) {
+ throw new UncheckedAvroTypeException(new AvroTypeException(e));
+ }
+ catch (AvroTypeException e) {
+ throw new UncheckedAvroTypeException(e);
+ }
+ }
+ }
+
+ @Override
+ public Optional read(Optional ignoredReuse, Decoder decoder)
+ throws IOException
+ {
+ Optional page = Optional.empty();
+ rowBlockBuildingDecoder.decodeIntoPageBuilder(decoder, pageBuilder);
+ if (pageBuilder.isFull()) {
+ page = Optional.of(pageBuilder.build());
+ pageBuilder.reset();
+ }
+ return page;
+ }
+
+ public Optional flush()
+ {
+ if (!pageBuilder.isEmpty()) {
+ Optional lastPage = Optional.of(pageBuilder.build());
+ pageBuilder.reset();
+ return lastPage;
+ }
+ return Optional.empty();
+ }
+
+ private abstract static class BlockBuildingDecoder
+ {
+ protected abstract void decodeIntoBlock(Decoder decoder, BlockBuilder builder)
+ throws IOException;
+ }
+
+ private static BlockBuildingDecoder createBlockBuildingDecoderForAction(Resolver.Action action, AvroTypeManager typeManager)
+ throws AvroTypeException
+ {
+ Optional> consumer = typeManager.overrideBuildingFunctionForSchema(action.reader);
+ if (consumer.isPresent()) {
+ return new UserDefinedBlockBuildingDecoder(action.reader, action.writer, consumer.get());
+ }
+ return switch (action.type) {
+ case DO_NOTHING -> switch (action.reader.getType()) {
+ case NULL -> NullBlockBuildingDecoder.INSTANCE;
+ case BOOLEAN -> BooleanBlockBuildingDecoder.INSTANCE;
+ case INT -> IntBlockBuildingDecoder.INSTANCE;
+ case LONG -> new LongBlockBuildingDecoder();
+ case FLOAT -> new FloatBlockBuildingDecoder();
+ case DOUBLE -> new DoubleBlockBuildingDecoder();
+ case STRING -> StringBlockBuildingDecoder.INSTANCE;
+ case BYTES -> BytesBlockBuildingDecoder.INSTANCE;
+ case FIXED -> new FixedBlockBuildingDecoder(action.reader.getFixedSize());
+ // these reader types covered by special action types
+ case ENUM, ARRAY, MAP, RECORD, UNION -> throw new IllegalStateException("Do Nothing action type not compatible with reader schema type " + action.reader.getType());
+ };
+ case PROMOTE -> switch (action.reader.getType()) {
+ // only certain types valid to promote into as determined by org.apache.avro.Resolver.Promote.isValid
+ case LONG -> new LongBlockBuildingDecoder(getLongPromotionFunction(action.writer));
+ case FLOAT -> new FloatBlockBuildingDecoder(getFloatPromotionFunction(action.writer));
+ case DOUBLE -> new DoubleBlockBuildingDecoder(getDoublePromotionFunction(action.writer));
+ case STRING -> {
+ if (action.writer.getType() == Schema.Type.BYTES) {
+ yield StringBlockBuildingDecoder.INSTANCE;
+ }
+ throw new AvroTypeException("Unable to promote to String from type " + action.writer.getType());
+ }
+ case BYTES -> {
+ if (action.writer.getType() == Schema.Type.STRING) {
+ yield BytesBlockBuildingDecoder.INSTANCE;
+ }
+ throw new AvroTypeException("Unable to promote to Bytes from type " + action.writer.getType());
+ }
+ case NULL, BOOLEAN, INT, FIXED, ENUM, ARRAY, MAP, RECORD, UNION ->
+ throw new AvroTypeException("Promotion action not allowed for reader schema type " + action.reader.getType());
+ };
+ case CONTAINER -> switch (action.reader.getType()) {
+ case ARRAY -> new ArrayBlockBuildingDecoder((Resolver.Container) action, typeManager);
+ case MAP -> new MapBlockBuildingDecoder((Resolver.Container) action, typeManager);
+ default -> throw new AvroTypeException("Not possible to have container action type with non container reader schema " + action.reader.getType());
+ };
+ case RECORD -> new RowBlockBuildingDecoder(action, typeManager);
+ case ENUM -> new EnumBlockBuildingDecoder((Resolver.EnumAdjust) action);
+ case WRITER_UNION -> {
+ if (isSimpleNullableUnion(action.reader)) {
+ yield new WriterUnionBlockBuildingDecoder((Resolver.WriterUnion) action, typeManager);
+ }
+ else {
+ yield new WriterUnionCoercedIntoRowBlockBuildingDecoder((Resolver.WriterUnion) action, typeManager);
+ }
+ }
+ case READER_UNION -> {
+ if (isSimpleNullableUnion(action.reader)) {
+ yield createBlockBuildingDecoderForAction(((Resolver.ReaderUnion) action).actualAction, typeManager);
+ }
+ else {
+ yield new ReaderUnionCoercedIntoRowBlockBuildingDecoder((Resolver.ReaderUnion) action, typeManager);
+ }
+ }
+ case ERROR -> throw new AvroTypeException("Resolution action returned with error " + action);
+ case SKIP -> throw new IllegalStateException("Skips filtered by row step");
+ };
+ }
+
+ // Different plugins may have different Avro Schema to Type mappings
+ // that are currently transforming GenericDatumReader returned objects into their target type during the record reading process
+ // This block building decoder allows plugin writers to port that code directly and use within this reader
+ // This mechanism is used to enhance Avro longs into timestamp types according to schema metadata
+ private static class UserDefinedBlockBuildingDecoder
+ extends BlockBuildingDecoder
+ {
+ private final BiConsumer userBuilderFunction;
+ private final DatumReader