-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Native Avro File Reader #17221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
electrum
merged 5 commits into
trinodb:master
from
jklamer:jklamer/AvroHiveFileFormatReader
Jun 21, 2023
Merged
Native Avro File Reader #17221
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
e15e0a6
Test avro upcasting timestamps
jklamer dd7f67b
Add Native Avro to Page code with connector defined mappings
jklamer 10f4a48
Refactor out hive split error utility function into HiveUtils
jklamer 95140b6
Hive Avro Native Reader
jklamer 35e959d
Move union to row coersion logic to hive-file-formats
jklamer File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
54 changes: 54 additions & 0 deletions
54
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/UnionToRowCoercionUtils.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| /* | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package io.trino.hive.formats; | ||
|
|
||
| import com.google.common.collect.ImmutableList; | ||
| import io.trino.spi.type.RowType; | ||
| import io.trino.spi.type.Type; | ||
| import io.trino.spi.type.TypeSignature; | ||
| import io.trino.spi.type.TypeSignatureParameter; | ||
|
|
||
| import java.util.List; | ||
|
|
||
| import static io.trino.spi.type.TinyintType.TINYINT; | ||
| import static io.trino.spi.type.TypeSignatureParameter.namedField; | ||
|
|
||
| public final class UnionToRowCoercionUtils | ||
| { | ||
| public static final String UNION_FIELD_TAG_NAME = "tag"; | ||
| public static final String UNION_FIELD_FIELD_PREFIX = "field"; | ||
| public static final Type UNION_FIELD_TAG_TYPE = TINYINT; | ||
|
|
||
| private UnionToRowCoercionUtils() {} | ||
|
|
||
| public static RowType rowTypeForUnionOfTypes(List<Type> types) | ||
| { | ||
| ImmutableList.Builder<RowType.Field> fields = ImmutableList.<RowType.Field>builder() | ||
| .add(RowType.field(UNION_FIELD_TAG_NAME, UNION_FIELD_TAG_TYPE)); | ||
| for (int i = 0; i < types.size(); i++) { | ||
| fields.add(RowType.field(UNION_FIELD_FIELD_PREFIX + i, types.get(i))); | ||
| } | ||
| return RowType.from(fields.build()); | ||
| } | ||
|
|
||
| public static TypeSignature rowTypeSignatureForUnionOfTypes(List<TypeSignature> typeSignatures) | ||
| { | ||
| ImmutableList.Builder<TypeSignatureParameter> fields = ImmutableList.builder(); | ||
| fields.add(namedField(UNION_FIELD_TAG_NAME, UNION_FIELD_TAG_TYPE.getTypeSignature())); | ||
| for (int i = 0; i < typeSignatures.size(); i++) { | ||
| fields.add(namedField(UNION_FIELD_FIELD_PREFIX + i, typeSignatures.get(i))); | ||
| } | ||
| return TypeSignature.rowType(fields.build()); | ||
| } | ||
| } |
177 changes: 177 additions & 0 deletions
177
lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroFileReader.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,177 @@ | ||
| /* | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package io.trino.hive.formats.avro; | ||
|
|
||
| import io.trino.filesystem.TrinoInputFile; | ||
| import io.trino.hive.formats.TrinoDataInputStream; | ||
| import io.trino.spi.Page; | ||
| import org.apache.avro.AvroRuntimeException; | ||
| import org.apache.avro.Schema; | ||
| import org.apache.avro.file.DataFileReader; | ||
| import org.apache.avro.file.SeekableInput; | ||
|
|
||
| import java.io.Closeable; | ||
| import java.io.IOException; | ||
| import java.util.Optional; | ||
| import java.util.OptionalLong; | ||
| import java.util.function.Function; | ||
|
|
||
| import static com.google.common.base.Verify.verify; | ||
| import static com.google.common.collect.ImmutableMap.toImmutableMap; | ||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| public class AvroFileReader | ||
| implements Closeable | ||
| { | ||
| private final TrinoDataInputStream input; | ||
| private final AvroPageDataReader dataReader; | ||
| private final DataFileReader<Optional<Page>> fileReader; | ||
| private Page nextPage; | ||
| private final OptionalLong end; | ||
|
|
||
| public AvroFileReader( | ||
| TrinoInputFile inputFile, | ||
| Schema schema, | ||
| AvroTypeManager avroTypeManager) | ||
| throws IOException, AvroTypeException | ||
| { | ||
| this(inputFile, schema, avroTypeManager, 0, OptionalLong.empty()); | ||
| } | ||
|
|
||
| public AvroFileReader( | ||
|
jklamer marked this conversation as resolved.
Outdated
|
||
| TrinoInputFile inputFile, | ||
| Schema schema, | ||
| AvroTypeManager avroTypeManager, | ||
| long offset, | ||
| OptionalLong length) | ||
| throws IOException, AvroTypeException | ||
| { | ||
| requireNonNull(inputFile, "inputFile is null"); | ||
| requireNonNull(schema, "schema is null"); | ||
| requireNonNull(avroTypeManager, "avroTypeManager is null"); | ||
| long fileSize = inputFile.length(); | ||
|
|
||
| verify(offset >= 0, "offset is negative"); | ||
| verify(offset < inputFile.length(), "offset is greater than data size"); | ||
| length.ifPresent(lengthLong -> verify(lengthLong >= 1, "length must be at least 1")); | ||
| end = length.stream().map(l -> l + offset).findFirst(); | ||
| end.ifPresent(endLong -> verify(endLong <= fileSize, "offset plus length is greater than data size")); | ||
| input = new TrinoDataInputStream(inputFile.newStream()); | ||
| dataReader = new AvroPageDataReader(schema, avroTypeManager); | ||
| try { | ||
| fileReader = new DataFileReader<>(new TrinoDataInputStreamAsAvroSeekableInput(input, fileSize), dataReader); | ||
| fileReader.sync(offset); | ||
| } | ||
| catch (AvroPageDataReader.UncheckedAvroTypeException runtimeWrapper) { | ||
| // Avro Datum Reader interface can't throw checked exceptions when initialized by the file reader, | ||
| // so the exception is wrapped in a runtime exception that must be unwrapped | ||
| throw runtimeWrapper.getAvroTypeException(); | ||
| } | ||
| avroTypeManager.configure(fileReader.getMetaKeys().stream().collect(toImmutableMap(Function.identity(), fileReader::getMeta))); | ||
| } | ||
|
|
||
| public long getCompletedBytes() | ||
| { | ||
| return input.getReadBytes(); | ||
| } | ||
|
|
||
| public long getReadTimeNanos() | ||
| { | ||
| return input.getReadTimeNanos(); | ||
| } | ||
|
|
||
| public boolean hasNext() | ||
| throws IOException | ||
| { | ||
| loadNextPageIfNecessary(); | ||
| return nextPage != null; | ||
| } | ||
|
|
||
| public Page next() | ||
| throws IOException | ||
| { | ||
| if (!hasNext()) { | ||
| throw new IOException("No more pages available from Avro file"); | ||
| } | ||
| Page result = nextPage; | ||
| nextPage = null; | ||
|
electrum marked this conversation as resolved.
Outdated
|
||
| return result; | ||
| } | ||
|
|
||
| private void loadNextPageIfNecessary() | ||
| throws IOException | ||
| { | ||
| while (nextPage == null && (end.isEmpty() || !fileReader.pastSync(end.getAsLong())) && fileReader.hasNext()) { | ||
| try { | ||
| nextPage = fileReader.next().orElse(null); | ||
|
electrum marked this conversation as resolved.
Outdated
|
||
| } | ||
| catch (AvroRuntimeException e) { | ||
| throw new IOException(e); | ||
| } | ||
| } | ||
| if (nextPage == null) { | ||
| nextPage = dataReader.flush().orElse(null); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void close() | ||
| throws IOException | ||
| { | ||
| fileReader.close(); | ||
| } | ||
|
|
||
| private record TrinoDataInputStreamAsAvroSeekableInput(TrinoDataInputStream inputStream, long fileSize) | ||
| implements SeekableInput | ||
| { | ||
| TrinoDataInputStreamAsAvroSeekableInput | ||
| { | ||
| requireNonNull(inputStream, "inputStream is null"); | ||
| } | ||
|
|
||
| @Override | ||
| public void seek(long p) | ||
| throws IOException | ||
| { | ||
| inputStream.seek(p); | ||
| } | ||
|
|
||
| @Override | ||
| public long tell() | ||
| throws IOException | ||
| { | ||
| return inputStream.getPos(); | ||
| } | ||
|
|
||
| @Override | ||
| public long length() | ||
| { | ||
| return fileSize; | ||
| } | ||
|
|
||
| @Override | ||
| public int read(byte[] b, int off, int len) | ||
| throws IOException | ||
| { | ||
| return inputStream.read(b, off, len); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() | ||
| throws IOException | ||
| { | ||
| inputStream.close(); | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this dependency? I would prefer if we did not need this in this module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Im using it for
io.trino.block.BlockAssertionscan I move that class to SPI?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should I make own version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should move it to the SPI. When you do that, switch the TestNG
assertEqualsusage to AssertJ, since we're trying to move away from TestNG in new code.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Getting the rolling IntelliJ errors on the refactor. I'll ping you offline on what you think we should do here.